Re: How windowing is implemented on Flink runner

2024-06-12 Thread Robert Bradshaw via user
Beam implements Windowing itself (via state and timers) rather than
deferring to Flink's implementation.

On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:
>
> Hello guys
>
> May be a silly question,
>
> But in the Flink runner, the window implementation uses the Flink
> windowing? Does that mean the runner will have performance issues like
> Flink itself? see this:
> https://issues.apache.org/jira/browse/FLINK-7001
>
> I'm asking because I see the issue, it mentions different concepts
> that Beam already handles at the API level. So my suspicion is that
> the Beam model handles windowing a little differently from the pure
> Flink app. But I'm not sure..
>
>
> Regards.


Re: Paralalelism of a side input

2024-06-12 Thread Robert Bradshaw via user
On Wed, Jun 12, 2024 at 7:56 AM Ruben Vargas  wrote:
>
> The approach looks good. but one question
>
> My understanding is that this will schedule for example 8 operators across 
> the workers, but only one of them will be processing, the others remain idle? 
> Are those consuming resources in some way? I'm assuming may be is not 
> significant.

That is correct, but the resources consumed by an idle operator should
be negligible.

> Thanks.
>
> El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user 
>  escribió:
>>
>> You can always limit the parallelism by assigning a single key to
>> every element and then doing a grouping or reshuffle[1] on that key
>> before processing the elements. Even if the operator parallelism for
>> that step is technically, say, eight, your effective parallelism will
>> be exactly one.
>>
>> [1] 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
>>
>> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas  wrote:
>> >
>> > Hello guys
>> >
>> > One question, I have a side input which fetches an endpoint each 30
>> > min, I pretty much copied the example here:
>> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
>> > some logic to fetch the endpoint and parse the payload.
>> >
>> > My question is: it is possible to control the parallelism of this
>> > single ParDo that does the fetch/transform? I don't think I need a lot
>> > of parallelism for that one. I'm currently using flink runner and I
>> > see the parallelism is 8 (which is the general parallelism for my
>> > flink cluster).
>> >
>> > Is it possible to set it to 1 for example?
>> >
>> >
>> > Regards.


Re: Paralalelism of a side input

2024-06-07 Thread Robert Bradshaw via user
You can always limit the parallelism by assigning a single key to
every element and then doing a grouping or reshuffle[1] on that key
before processing the elements. Even if the operator parallelism for
that step is technically, say, eight, your effective parallelism will
be exactly one.

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html

On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas  wrote:
>
> Hello guys
>
> One question, I have a side input which fetches an endpoint each 30
> min, I pretty much copied the example here:
> https://beam.apache.org/documentation/patterns/side-inputs/ but added
> some logic to fetch the endpoint and parse the payload.
>
> My question is: it is possible to control the parallelism of this
> single ParDo that does the fetch/transform? I don't think I need a lot
> of parallelism for that one. I'm currently using flink runner and I
> see the parallelism is 8 (which is the general parallelism for my
> flink cluster).
>
> Is it possible to set it to 1 for example?
>
>
> Regards.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Robert Bradshaw via user
On Fri, Apr 12, 2024 at 1:39 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>

or abs(hash(id)) % 100, in case the first character of your id is not well
distributed.


> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>

Bundles are the unit of commitment in Beam [1], so finish_bundle won't drop
any data. A possible downside is that, especially in streaming, they may be
small which would cap the amount of batching you get.

https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence


> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Hot update in dataflow without lossing messages

2024-04-15 Thread Robert Bradshaw via user
Are you draining[1] your pipeline or simply canceling it and starting a new
one? Draining should close open windows and attempt to flush all in-flight
data before shutting down. For PubSub you may also need to read from
subscriptions rather than topics to ensure messages are processed by either
one or the other.

[1] https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain

On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:

> Hi guys. Good morning.
>
> I haven't done some test in apache beam over data flow in order to see if
> i can do an hot update or hot swap meanwhile the pipeline is processing a
> bunch of messages that fall in a time window of 10 minutes. What I saw is
> that when I do a hot update over the pipeline and currently there are some
> messages in the time window (before sending them to the target), the
> current job is shutdown and dataflow creates a new one. The problem is that
> it seems that I am losing the messages that were being processed in the old
> one and they are not taken by the new one, which imply we are incurring in
> losing data .
>
> Can you help me or recommend any strategy to me?
>
> Thanks!!
>


Re: Fails to run two multi-language pipelines locally?

2024-03-08 Thread Robert Bradshaw via user
The way that cross-language pipelines work is that each transform has
an attached "environment" in which its workers should be instantiated.
By default these are identified as docker images + a possible set of
dependencies. Transforms with the same environment can be colocated.

There is a tension between having large environments that can be
shared and more targeted ones that have fewer dependencies.

By default the environment for SQL relies on Beam's jar built as
"sdks:java:extensions:sql:expansion-service:shadowJar" and the
environment for Kafka relies on Beam's jar for
"sdks:java:io:expansion-service:shadowJar" To use a single jar you
could create a jar with both of these (plus whatever else you need)
and use that (the easiest would be to pass the flag

--beamServices='{
  "sdks:java:extensions:sql:expansion-service:shadowJar": "/path/to/uber.jar",
  "sdks:java:io:expansion-service:shadowJar": "/path/to/uber.jar"
}'

That being said, it should work just fine with distinct jars on an
arbitrary setup as well (and the fact that it works locally hints to
something being up with that setup). Not sure what your flink master
configuration is like, but maybe it's a docker-in-docker issue?

On Wed, Mar 6, 2024 at 8:30 PM Jaehyeon Kim  wrote:
>
> It may be a configuration issue. It works if I don't specify flink_master, 
> which uses an embedded flink cluster.
>
> On Thu, 7 Mar 2024 at 12:47, Jaehyeon Kim  wrote:
>>
>> I works fine if I only use Kafka read/write as I only see a single container 
>> - two transforms (read and write) but a single container.
>>
>> If I add SqlTransform, however, another container is created and it begins 
>> to create an error. My speculation is the containers don't recognise each 
>> other and get killed by the Flink task manager. I see containers are kept 
>> created and killed.
>>
>> Does every multi-language pipeline runs in a separate container?
>>
>> On Thu, 7 Mar 2024, 12:35 pm Robert Bradshaw via user, 
>>  wrote:
>>>
>>> Oh, sorry, I didn't see that.
>>>
>>> I would look earlier in the logs and see why it failed to bring up the
>>> containers (or, if they started, look in the container logs to see why
>>> they died).
>>>
>>> On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim  wrote:
>>> >
>>> > I am not using the python local runner but the flink runner. A flink 
>>> > cluster is started locally.
>>> >
>>> > On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user 
>>> >  wrote:
>>> >>
>>> >> Streaming portable pipelines are not yet supported on the Python local 
>>> >> runner.
>>> >>
>>> >> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>>> >> >
>>> >> > Hello,
>>> >> >
>>> >> > I use the python SDK and my pipeline reads messages from Kafka and 
>>> >> > transforms via SQL. I see two containers are created but it seems that 
>>> >> > they don't communicate with each other so that the Flink task manager 
>>> >> > keeps killing the containers. The Flink cluster runs locally. Is there 
>>> >> > a way to run two multi-language pipelines (running on Docker) 
>>> >> > communicating with each other?
>>> >> >
>>> >> > Cheers,
>>> >> > Jaehyeon
>>> >> >
>>> >> >
>>> >> >
>>> >> > def run():
>>> >> > parser = argparse.ArgumentParser(
>>> >> > description="Process statistics by user from website visit 
>>> >> > event"
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--inputs",
>>> >> > default="inputs",
>>> >> > help="Specify folder name that event records are saved",
>>> >> > )
>>> >> > parser.add_argument(
>>> >> > "--runner", default="FlinkRunner", help="Specify Apache Beam 
>>> >> > Runner"
>>> >> > )
>>> >> > opts = parser.parse_args()
>>> >> >
>>> >> > options = PipelineOptions()
>>> >> > pipeline_opts = {
>>> >> > "runner": opts.runner,
>>> >> > "flink_master"

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Robert Bradshaw via user
The Python Local Runner has limited support for streaming pipelines.
For the time being would recommend using Dataflow or Flink (the latter
can be run locally) to try out streaming pipelines.

On Fri, Mar 8, 2024 at 2:11 PM Puertos tavares, Jose J (Canada) via
user  wrote:
>
> Hello  Hu:
>
>
>
> Not really.  This one as you have coded it finishes as per   
> stop_timestamp=time.time() + 16 and after it finish emitting then everything 
> else gets output and the pipeline in batch mode terminates.
>
>
>
> You can rule out STDOUT issues and  confirm this behavior as putting a ParDo  
> with something that would throw an exception after the GroupBy  or write 
> temporary files/make HTTP requests. This ParDO won’t be executed until your 
> PeriodImpulse terminates (you can extend it to +60  and see this is not being 
> trigger on your 4 second window, but until it stops generating)
>
>
>
> I am looking for something that is really streaming and executes constantly 
> and that in this case , every 4 seconds the window would process the elements 
> in the window and wait for the next window to accumulate.
>
>
>
> Regards,
>
> JP
>
>
>
>
>
>
>
>
>
>
> INTERNAL USE
>
> From: XQ Hu 
> Sent: Friday, March 8, 2024 3:51 PM
> To: user@beam.apache.org
> Cc: Puertos tavares, Jose J (Canada) 
> Subject: [EXTERNAL] Re: [Question] Python Streaming Pipeline Support
>
>
>
> Is this what you are looking for? import random import time import 
> apache_beam as beam from apache_beam. transforms import trigger, window from 
> apache_beam. transforms. periodicsequence import PeriodicImpulse from 
> apache_beam. utils. timestamp import
>
> Is this what you are looking for?
>
>
>
> import random
> import time
>
> import apache_beam as beam
> from apache_beam.transforms import trigger, window
> from apache_beam.transforms.periodicsequence import PeriodicImpulse
> from apache_beam.utils.timestamp import Timestamp
>
> with beam.Pipeline() as p:
> input = (
> p
> | PeriodicImpulse(
> start_timestamp=time.time(),
> stop_timestamp=time.time() + 16,
> fire_interval=1,
> apply_windowing=False,
> )
> | beam.Map(lambda x: random.random())
> | beam.WindowInto(window.FixedWindows(4))
> | beam.GroupBy()
> | "Print Windows"
> >> beam.transforms.util.LogElements(with_timestamp=True, 
> with_window=True)
> )
>
>
>
> On Fri, Mar 8, 2024 at 6:48 AM Puertos tavares, Jose J (Canada) via user 
>  wrote:
>
> Hello Beam Users!
>
>
>
> I was looking into a simple example in Python to have an unbound (--streaming 
> flag ) pipeline that generated random numbers , applied a Fixed Window (let’s 
> say 5 seconds) and then applies a group by operation ( reshuffle) and print 
> the result just to check.
>
>
>
> I notice that this seems to work as long as there is no grouping operation 
> (reshuffle, groupBy ,etc. ) that would leverage the windowing semantics.
>
>
>
> #Get Parameters from Command Line for the Pipeline
>
> known_args, pipeline_options = parser.parse_known_args(argv)
>
> pipeline_options = PipelineOptions(flags=argv)
>
>
>
> #Create pipeline
>
> p = beam.Pipeline(options=pipeline_options)
>
>
>
>
>
> #Execute Pipeline
>
> (p | "Start pipeline " >> beam.Create([0])
>
> | "Get values"  >> beam.ParDo(RandomNumberGenerator())
>
> | 'Applied fixed windows ' >> beam.WindowInto( window.FixedWindows(1*5) )
>
> | 'Reshuffle ' >> beam.Reshuffle()
>
> |  "Print" >> beam.Map(lambda x: print ("{} - {} ".format(os.getpid(), x) 
> ,flush=True ) )
>
> )
>
>
>
> result = p.run()
>
> result.wait_until_finish()
>
>
>
>
>
> Even thought the  Random Generator is unbound and tagged as so with the 
> decorator, it seems to stuck, if I make that step finite (i.e. adding a 
> counter and exiting) then the code works in regular batch mode.
>
>
>
> # 
> =
>
> # Class for Splittable Do  Random Generatered numbers
>
> # 
> =
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> class RandomNumberGenerator(beam.DoFn):
>
>
>
> @beam.transforms.core.DoFn.unbounded_per_element()
>
> def process(self, element ):
>
> import random
>
> import time
>
>
>
> counter=0
>
>
>
>
>
> while True:
>
>
>
> #if counter>5:
>
> #break
>
> nmb = random.randint(0, 1000)
>
> wait = random.randint(0, 5)
>
> rnow = time.time()
>
>
>
>
>
> print("Randy random", nmb)
>
>
>
> yield beam.window.TimestampedValue(nmb, rnow)
>
> time.sleep(wait)
>
> counter+=1
>
>
>
> I have tried to implement as per documentation the tracker and watermark, but 
> it seems that none of that seems to work either for the DirectRunner or 
> FlinkRunn

Re: Fails to run two multi-language pipelines locally?

2024-03-06 Thread Robert Bradshaw via user
Oh, sorry, I didn't see that.

I would look earlier in the logs and see why it failed to bring up the
containers (or, if they started, look in the container logs to see why
they died).

On Wed, Mar 6, 2024 at 5:28 PM Jaehyeon Kim  wrote:
>
> I am not using the python local runner but the flink runner. A flink cluster 
> is started locally.
>
> On Thu, 7 Mar 2024 at 12:13, Robert Bradshaw via user  
> wrote:
>>
>> Streaming portable pipelines are not yet supported on the Python local 
>> runner.
>>
>> On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>> >
>> > Hello,
>> >
>> > I use the python SDK and my pipeline reads messages from Kafka and 
>> > transforms via SQL. I see two containers are created but it seems that 
>> > they don't communicate with each other so that the Flink task manager 
>> > keeps killing the containers. The Flink cluster runs locally. Is there a 
>> > way to run two multi-language pipelines (running on Docker) communicating 
>> > with each other?
>> >
>> > Cheers,
>> > Jaehyeon
>> >
>> >
>> >
>> > def run():
>> > parser = argparse.ArgumentParser(
>> > description="Process statistics by user from website visit event"
>> > )
>> > parser.add_argument(
>> > "--inputs",
>> > default="inputs",
>> > help="Specify folder name that event records are saved",
>> > )
>> > parser.add_argument(
>> > "--runner", default="FlinkRunner", help="Specify Apache Beam 
>> > Runner"
>> > )
>> > opts = parser.parse_args()
>> >
>> > options = PipelineOptions()
>> > pipeline_opts = {
>> > "runner": opts.runner,
>> > "flink_master": "localhost:8081",
>> > "job_name": "traffic-agg-sql",
>> > "environment_type": "LOOPBACK",
>> > "streaming": True,
>> > "parallelism": 3,
>> > "experiments": [
>> > "use_deprecated_read"
>> > ],  ## https://github.com/apache/beam/issues/20979
>> > "checkpointing_interval": "6",
>> > }
>> > options = PipelineOptions([], **pipeline_opts)
>> > # Required, else it will complain that when importing worker functions
>> > options.view_as(SetupOptions).save_main_session = True
>> >
>> > query = """
>> > WITH cte AS (
>> > SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
>> > FROM PCOLLECTION
>> > )
>> > SELECT
>> > CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>> > window_start,
>> > CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
>> > window_end,
>> > COUNT(*) AS page_view
>> > FROM cte
>> > GROUP BY
>> > TUMBLE(ts, INTERVAL '10' SECOND), id
>> > """
>> >
>> > p = beam.Pipeline(options=options)
>> > (
>> > p
>> > | "Read from Kafka"
>> > >> kafka.ReadFromKafka(
>> > consumer_config={
>> > "bootstrap.servers": os.getenv(
>> > "BOOTSTRAP_SERVERS",
>> > "host.docker.internal:29092",
>> > ),
>> > "auto.offset.reset": "earliest",
>> > # "enable.auto.commit": "true",
>> > "group.id": "traffic-agg",
>> > },
>> > topics=["website-visit"],
>> > )
>> > | "Decode messages" >> beam.Map(decode_message)
>> > | "Parse elements" >> 
>> > beam.Map(parse_json).with_output_types(EventLog)
>> > | "Format timestamp" >> 
>> > beam.Map(format_timestamp).with_output_types(EventLog)
>> > | "Count per minute" >> SqlTransform(query)
>> > | beam.Map(print)
>> > )
>> >

Re: Fails to run two multi-language pipelines locally?

2024-03-06 Thread Robert Bradshaw via user
Streaming portable pipelines are not yet supported on the Python local runner.

On Wed, Mar 6, 2024 at 5:03 PM Jaehyeon Kim  wrote:
>
> Hello,
>
> I use the python SDK and my pipeline reads messages from Kafka and transforms 
> via SQL. I see two containers are created but it seems that they don't 
> communicate with each other so that the Flink task manager keeps killing the 
> containers. The Flink cluster runs locally. Is there a way to run two 
> multi-language pipelines (running on Docker) communicating with each other?
>
> Cheers,
> Jaehyeon
>
>
>
> def run():
> parser = argparse.ArgumentParser(
> description="Process statistics by user from website visit event"
> )
> parser.add_argument(
> "--inputs",
> default="inputs",
> help="Specify folder name that event records are saved",
> )
> parser.add_argument(
> "--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
> )
> opts = parser.parse_args()
>
> options = PipelineOptions()
> pipeline_opts = {
> "runner": opts.runner,
> "flink_master": "localhost:8081",
> "job_name": "traffic-agg-sql",
> "environment_type": "LOOPBACK",
> "streaming": True,
> "parallelism": 3,
> "experiments": [
> "use_deprecated_read"
> ],  ## https://github.com/apache/beam/issues/20979
> "checkpointing_interval": "6",
> }
> options = PipelineOptions([], **pipeline_opts)
> # Required, else it will complain that when importing worker functions
> options.view_as(SetupOptions).save_main_session = True
>
> query = """
> WITH cte AS (
> SELECT id, CAST(event_datetime AS TIMESTAMP) AS ts
> FROM PCOLLECTION
> )
> SELECT
> CAST(TUMBLE_START(ts, INTERVAL '10' SECOND) AS VARCHAR) AS 
> window_start,
> CAST(TUMBLE_END(ts, INTERVAL '10' SECOND) AS VARCHAR) AS window_end,
> COUNT(*) AS page_view
> FROM cte
> GROUP BY
> TUMBLE(ts, INTERVAL '10' SECOND), id
> """
>
> p = beam.Pipeline(options=options)
> (
> p
> | "Read from Kafka"
> >> kafka.ReadFromKafka(
> consumer_config={
> "bootstrap.servers": os.getenv(
> "BOOTSTRAP_SERVERS",
> "host.docker.internal:29092",
> ),
> "auto.offset.reset": "earliest",
> # "enable.auto.commit": "true",
> "group.id": "traffic-agg",
> },
> topics=["website-visit"],
> )
> | "Decode messages" >> beam.Map(decode_message)
> | "Parse elements" >> beam.Map(parse_json).with_output_types(EventLog)
> | "Format timestamp" >> 
> beam.Map(format_timestamp).with_output_types(EventLog)
> | "Count per minute" >> SqlTransform(query)
> | beam.Map(print)
> )
>
> logging.getLogger().setLevel(logging.INFO)
> logging.info("Building pipeline ...")
>
> p.run().wait_until_finish()
>
> Here is the error message from the flink UI.
>
> 2024-03-07 12:01:41
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>  java.lang.IllegalStateException: No container running for id 
> cd132e177c5867a80becaeffd50075e87287db468e069c61e02056c7cdc90fc8
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
> at 
> org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:458)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:443)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
> at 
> org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
> at 
> org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
> at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
> at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperat

Re: Roadmap of Calcite support on Beam SQL?

2024-03-04 Thread Robert Bradshaw via user
There is no longer a huge amount of active development going on here,
but implementing a missing function seems like an easy contribution
(lots of examples to follow). Otherwise, definitely worth filing a
feature request as a useful signal for prioritization.

On Mon, Mar 4, 2024 at 4:33 PM Jaehyeon Kim  wrote:
>
> Hi Wiśniowski
>
> Thank you so much for your reply. The query works for me. As I'm new to Beam 
> SQL, I'd spend some more time before issuing a feature request.
>
> Cheers,
> Jaehyeon
>
> On Mon, 4 Mar 2024 at 23:03, Wiśniowski Piotr 
>  wrote:
>>
>> Hi,
>>
>> 1. I do not have up to date knowledge, but Beam sql was missing quite a lot 
>> of things regarding Calcite full support. I think the current way is to 
>> create a feature request on repository and get votes and interest. I 
>> definitely would vote for You initiative ;)
>>
>> 2. Regarding the query itself I got it working for something like this:
>> ```
>>
>> WITH cte AS (
>> SELECT CAST(event_datetime AS TIMESTAMP) AS ts
>> FROM PCOLLECTION
>> )
>> SELECT
>> CAST(TUMBLE_START(cte.ts, INTERVAL '1' MINUTE) AS VARCHAR) AS start_time,
>> CAST(TUMBLE_END(cte.ts, INTERVAL '1' MINUTE) AS VARCHAR) AS end_time,
>> COUNT(*) AS page_views
>> FROM cte
>> GROUP BY
>> TUMBLE(cte.ts, INTERVAL '1' MINUTE)
>> ;
>>
>> ```
>>
>> Maybe it would be useful for you. Note that I am not up to date with recent 
>> versions of Beam SQL, but I will need to catch up (the syntax for defining 
>> window on table is cool).
>>
>> Best
>>
>> Wiśniowski Piotr
>>
>> On 4.03.2024 05:27, Jaehyeon Kim wrote:
>>
>> Hello,
>>
>> I just tried a simple tumbling window but failed with the following error
>>
>> RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable 
>> to parse query
>> WITH cte AS (
>> SELECT TO_TIMESTAMP(event_datetime) AS ts FROM PCOLLECTION
>> )
>> SELECT
>> CAST(window_start AS VARCHAR) AS start_time,
>> CAST(window_end AS VARCHAR) AS end_time,
>> COUNT(*) AS page_views
>> FROM TABLE(
>> TUMBLE(TABLE cte, DESCRIPTOR(ts), 'INTERVAL 1 MINUTE')
>> )
>> GROUP BY
>> window_start, window_end
>>
>> I guess it is because TO_TIMESTAMP is not implemented. When I check the 
>> document, it misses lots of functions. Is there any roadmap about Calcite 
>> support on Beam SQL?
>>
>> Cheers,
>> Jaehyeon


Re: ParDo(DoFn) with multiple context.output vs FlatMapElements

2024-01-26 Thread Robert Bradshaw via user
There is no difference; FlatMapElements is implemented in terms of a
DoFn that invokes context.output multiple times. And, yes, Dataflow
will fuse consecutive operations automatically. So if you have
something like

... -> DoFnA -> DoFnB -> GBK -> DoFnC -> ...

Dataflow will fuse DoFnA and DoFnB together, and if DoFnA produces a
lot of data for DoFnB to consume then more workers will be allocated
to handle the (DoFnA + DoFnB) combination. If the fanout is so huge
that a single worker would not be expected to handle the output DoFnA
produces from a single input, you could look into making DoFnA into a
SplittableDoFn https://beam.apache.org/blog/splittable-do-fn-is-available
. If DoFnB is just really expensive, you can also decouple the
parallelism between the two with a Reshuffle. Most of the time neither
of these is needed.

On Wed, Dec 27, 2023 at 5:44 PM hsy...@gmail.com  wrote:
>
> Hello
>
> I have a question. If I have a transform for each input it will emit 1 or 
> many output (same collection)
> I can do it with ParDo + DoFun while in processElement method for each input, 
> call context.output multiply times vs doing it with FlatMapElements, is there 
> any difference? Does the dataflow fuse the downstream transform 
> automatically? Eventually I want more downstream transform workers cause it 
> needs to handle more data, How do I supposed to do that?
>
> Regards,
> Siyuan


Re: Downloading and executing addition jar file when using Python API

2024-01-24 Thread Robert Bradshaw via user
On Wed, Jan 24, 2024 at 10:48 AM Mark Striebeck
 wrote:
>
> If point beam to the local jar, will beam start and also stop the expansion 
> service?

Yes it will.

> Thanks
>  Mark
>
> On Wed, 24 Jan 2024 at 08:30, Robert Bradshaw via user  
> wrote:
>>
>> You can also manually designate a replacement jar to be used rather
>> than fetching the jar from maven, either as a pipeline option or (as
>> of the next release) as an environment variable. The format is a json
>> mapping from gradle targets (which is how we identify these jars) to
>> local files (or urls). For example, pass
>>
>>   --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
>> "/path/to/your/copy.jar"}'
>>
>> to use the local jar to automatically expand your SQL transforms.
>>
>> See the docs at
>> https://github.com/apache/beam/blob/7e95776a8d08ef738be49ef47842029c306f2bf5/sdks/python/apache_beam/options/pipeline_options.py#L587
>>
>> On Tue, Jan 23, 2024 at 5:59 PM Chamikara Jayalath via user
>>  wrote:
>> >
>> > The expansion service jar is needed since sql.py includes cross-language 
>> > transforms that use the Java implementation behind the hood.
>> >
>> > Once downloaded, the jar is cached, and subsequent jobs should use the jar 
>> > from that location.
>> >
>> > If you want to use a locally available jar, you can manually startup an 
>> > expansion service [1] and point the Python SQL transform to that [2].
>> >
>> > Thanks,
>> > Cham
>> >
>> > [1] 
>> > https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service
>> > [2] 
>> > https://github.com/apache/beam/blob/7ff25d896250508570b27683bc76523ac2fe3210/sdks/python/apache_beam/transforms/sql.py#L84
>> >
>> > On Tue, Jan 23, 2024 at 3:57 PM Mark Striebeck  
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >> Sorry, this question seems so obvious that I'm sure it came up before. 
>> >> But I couldn't find anything in the docs or the mail archives. Feel free 
>> >> to point me in the right direction...
>> >>
>> >> We are using the Python API for Beam. Recently we started using Beam SQL 
>> >> - which apparently needs a jar file that is not provided with the Python 
>> >> Pip package. When I run tests,I can see that Beam downloads 
>> >> beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar and unpacks it 
>> >> into ~/.apache_beam and uses it to start an RPC server.
>> >>
>> >> While this works for local testing, I am trying to figure out how to work 
>> >> this into our CI and deployment process.
>> >>
>> >> Preferably would be to download a pip package that has this jar (and 
>> >> others) in it and just uses it.
>> >>
>> >> If that doesn't exist (I couldn't find it), then we'd need to check this 
>> >> jar file into our source tree, so that we can use it for CI but then also 
>> >> make it part of the docker image that we use to run our Beam pipelines on 
>> >> GCP Dataflow. How could I tell Beam to use that file instead of 
>> >> downloading it? I tried obvious settings like CLASSPATH environment 
>> >> variable - but nothing works. Beam always tries to fetch the file from 
>> >> maven.
>> >>
>> >> Again, feel free to point me to any relevant mail discussion or web page.
>> >>
>> >> Thanks
>> >>  Mark


Re: Downloading and executing addition jar file when using Python API

2024-01-24 Thread Robert Bradshaw via user
You can also manually designate a replacement jar to be used rather
than fetching the jar from maven, either as a pipeline option or (as
of the next release) as an environment variable. The format is a json
mapping from gradle targets (which is how we identify these jars) to
local files (or urls). For example, pass

  --beam_services='{":sdks:java:extensions:sql:expansion-service:shadowJar":
"/path/to/your/copy.jar"}'

to use the local jar to automatically expand your SQL transforms.

See the docs at
https://github.com/apache/beam/blob/7e95776a8d08ef738be49ef47842029c306f2bf5/sdks/python/apache_beam/options/pipeline_options.py#L587

On Tue, Jan 23, 2024 at 5:59 PM Chamikara Jayalath via user
 wrote:
>
> The expansion service jar is needed since sql.py includes cross-language 
> transforms that use the Java implementation behind the hood.
>
> Once downloaded, the jar is cached, and subsequent jobs should use the jar 
> from that location.
>
> If you want to use a locally available jar, you can manually startup an 
> expansion service [1] and point the Python SQL transform to that [2].
>
> Thanks,
> Cham
>
> [1] 
> https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/#choose-an-expansion-service
> [2] 
> https://github.com/apache/beam/blob/7ff25d896250508570b27683bc76523ac2fe3210/sdks/python/apache_beam/transforms/sql.py#L84
>
> On Tue, Jan 23, 2024 at 3:57 PM Mark Striebeck  
> wrote:
>>
>> Hi,
>>
>> Sorry, this question seems so obvious that I'm sure it came up before. But I 
>> couldn't find anything in the docs or the mail archives. Feel free to point 
>> me in the right direction...
>>
>> We are using the Python API for Beam. Recently we started using Beam SQL - 
>> which apparently needs a jar file that is not provided with the Python Pip 
>> package. When I run tests,I can see that Beam downloads 
>> beam-sdks-java-extensions-sql-expansion-service-2.52.0.jar and unpacks it 
>> into ~/.apache_beam and uses it to start an RPC server.
>>
>> While this works for local testing, I am trying to figure out how to work 
>> this into our CI and deployment process.
>>
>> Preferably would be to download a pip package that has this jar (and others) 
>> in it and just uses it.
>>
>> If that doesn't exist (I couldn't find it), then we'd need to check this jar 
>> file into our source tree, so that we can use it for CI but then also make 
>> it part of the docker image that we use to run our Beam pipelines on GCP 
>> Dataflow. How could I tell Beam to use that file instead of downloading it? 
>> I tried obvious settings like CLASSPATH environment variable - but nothing 
>> works. Beam always tries to fetch the file from maven.
>>
>> Again, feel free to point me to any relevant mail discussion or web page.
>>
>> Thanks
>>  Mark


Re: TypeError: '_ConcatSequence' object is not subscriptable

2024-01-22 Thread Robert Bradshaw via user
This is probably because you're trying to index into the result of the
GroupByKey in your AnalyzeSession as if it were a list. All that is
promised is that it is an iterable. If it is large enough to merit
splitting over multiple fetches, it won't be a list. (If you need to
index, explicitly convert it into a list first, assuming it fits into
memory. Otherwise just stick to re-iterating over it.)

On Mon, Jan 22, 2024 at 7:21 AM Nimrod Shory  wrote:
>
> Hello everyone,
> We encounter a weird issue while running a Python + Beam streaming job on 
> Google Cloud Dataflow.
> The job listens to a PubSub subscription of events, and my pipeline looks 
> like this:
>
>> messages = (
>>  p | "Read Topic" >> 
>> beam.io.ReadFromPubSub(subscription=options.subscription.get())
>>| "JSON" >> beam.Map(json.loads)
>> )
>> sessions = (
>> messages | "Add Keys" >> beam.WithKeys(lambda x: x["id"])
>> | "Session Window" >> 
>> beam.WindowInto(beam.window.Sessions(SESSION_TIMEOUT))
>> | beam.GroupByKey()
>> | "Analyze Session" >> beam.ParDo(AnalyzeSession())
>> )
>> sessions | beam.io.WriteToPubSub(topic=options.session_topic.get())
>
>
>
> After it runs for some time without any issues, I suddenly start getting the 
> following error:
>
>> TypeError: '_ConcatSequence' object is not subscriptable
>
>
> Instead of getting the expected key value pair I usually get:
>>
>> ('ID123', [{...},{...},{...}])
>
>
> I start getting:
>>
>> ('ID234', > 0x7feca40d1d90>)
>
>
> I suspect this happens due to a heavy load, but I could not find any 
> information on why it could happen and how to mitigate it.
>
> Any help would be much appreciated!
> Thanks.


Re: Does withkeys transform enforce a reshuffle?

2024-01-19 Thread Robert Bradshaw via user
Reshuffle is perfectly fine to use if the goal is just to redistribute
work. It's only deprecated as a "checkpointing" mechanism.

On Fri, Jan 19, 2024 at 9:44 AM Danny McCormick via user
 wrote:
>
> For runners that support Reshuffle, it should be safe to use. Its been 
> "deprecated" for 7 years, but is still heavily used/often the recommended way 
> to do things like this. I actually just added a PR to undeprecate it earlier 
> today. Looks like you're using Dataflow, which also has always supported 
> ReShuffle.
>
> > Also I looked at the code, reshuffle seems doing some groupby work 
> > internally. But I don't really need groupby
>
> Groupby is basically an implementation detail that creates the desired 
> shuffling behavior in many runners (runners can also override transform 
> implementations if needed for some primitives like this, but that's another 
> can of worms). Basically, in order to prevent fusion you need some operation 
> that does this and GroupBy is one option.
>
> Given that you're using DataFlow, I'd also recommend checking out 
> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#prevent_fusion 
> which describes how to do this in more detail.
>
> Thanks,
> Danny
>
> On Fri, Jan 19, 2024 at 12:36 PM hsy...@gmail.com  wrote:
>>
>> Also I looked at the code, reshuffle seems doing some groupby work 
>> internally. But I don't really need groupby
>>
>> On Fri, Jan 19, 2024 at 9:35 AM hsy...@gmail.com  wrote:
>>>
>>> ReShuffle is deprecated
>>>
>>> On Fri, Jan 19, 2024 at 8:25 AM XQ Hu via user  wrote:

 I do not think it enforces a reshuffle by just checking the doc here: 
 https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html?highlight=withkeys#apache_beam.transforms.util.WithKeys

 Have you tried to just add ReShuffle after PubsubLiteIO?

 On Thu, Jan 18, 2024 at 8:54 PM hsy...@gmail.com  wrote:
>
> Hey guys,
>
> I have a question, does withkeys transformation enforce a reshuffle?
>
> My pipeline basically look like this PubsubLiteIO -> ParDo(..) -> ParDo() 
> -> BigqueryIO.write()
>
> The problem is PubsubLiteIO -> ParDo(..) -> ParDo() always fused 
> together. But The ParDo is expensive and I want dataflow to have more 
> workers to work on that, what's the best way to do that?
>
> Regards,
>


Re: How to debug ArtifactStagingService ?

2024-01-05 Thread Robert Bradshaw via user
Nothing problematic is standing out for me in those logs. A job
service and artifact staging service is spun up to allow the job (and
its artifacts) to be submitted, then they are shut down. What are the
actual errors that you are seeing?

On Wed, Jan 3, 2024 at 7:39 AM Lydian  wrote:
>
>
> Hi,
>
> We are running Beam 2.41.0 with the portable flink runner using python SDK. 
> However, we suddenly noticed that all our jobs are now failing with error 
> like this:
> ```
> 2024-01-03 15:35:30,067 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - 
> ArtifactStagingService started on localhost:41047
> 2024-01-03 15:35:31,640 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - Java 
> ExpansionService started on localhost:35299
> 2024-01-03 15:35:31,676 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - JobService 
> started on localhost:42519
> 2024-01-03 15:35:31,677 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - Job server 
> now running, terminate with Ctrl+C
> 2024-01-03 15:35:31,996 INFO  
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint  [] - Started 
> driver program
> 2024-01-03 15:35:43,899 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Staging artifacts for job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.
> 2024-01-03 15:35:43,899 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Resolving artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.ref_Environment_d
> efault_environment_2.
> 2024-01-03 15:35:43,902 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Getting 0 artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.external_1beam:en
> v:process:v1.
> 2024-01-03 15:35:43,902 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Resolving artifacts for 
> job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.external_1beam:en
> v:process:v1.
> 2024-01-03 15:35:43,903 INFO  
> org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService [] - 
> Getting 1 artifacts for job_12e792dc-6e6f-417f-aad3-0da89df2b6d8.null.
> 2024-01-03 15:36:02,047 INFO  
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint  [] - Stopping 
> job service
> 2024-01-03 15:36:02,050 INFO  
> org.apache.beam.runners.jobsubmission.JobServerDriver[] - JobServer 
> stopped on localhost:42519
> ```
> It seems like the error is related to the ArtifactStagingService, but I am 
> having trouble identifying the root cause. Wondering if someone would be able 
> to help me figure out how to pull more informative debug logging to fix this 
> issue. Thanks!
>
> Sincerely,
> Lydian Lee
>


Re: Dataflow not able to find a module specified using extra_package

2023-12-19 Thread Robert Bradshaw via user
And should it be a list of strings, rather than a string?

On Tue, Dec 19, 2023 at 10:10 AM Anand Inguva via user 
wrote:

> Can you try passing `extra_packages` instead of `extra_package` when
> passing pipeline options as a dict?
>
> On Tue, Dec 19, 2023 at 12:26 PM Sumit Desai via user <
> user@beam.apache.org> wrote:
>
>> Hi all,
>> I have created a Dataflow pipeline in batch mode using Apache beam Python
>> SDK. I am using one non-public dependency 'uplight-telemetry'. I have
>> specified it using parameter extra_package while creating pipeline_options
>> object. However, the pipeline loading is failing with an error *No
>> module named 'uplight_telemetry'*.
>> The code to create pipeline_options is as following-
>>
>> def __create_pipeline_options_dataflow(job_name):
>> # Set up the Dataflow runner options
>> gcp_project_id = os.environ.get(GCP_PROJECT_ID)
>> current_dir = os.path.dirname(os.path.abspath(__file__))
>> print("current_dir=", current_dir)
>> setup_file_path = os.path.join(current_dir, '..', '..', 'setup.py')
>> print("Set-up file path=", setup_file_path)
>> #TODO:Move file to proper location
>> uplight_telemetry_tar_file_path=os.path.join(current_dir, '..', 
>> '..','..','non-public-dependencies', 'uplight-telemetry-1.0.0.tar.gz')
>> # TODO:Move to environmental variables
>> pipeline_options = {
>> 'project': gcp_project_id,
>> 'region': "us-east1",
>> 'job_name': job_name,  # Provide a unique job name
>> 'temp_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/temp',
>> 'staging_location': 
>> f'gs://{TAS_GCS_BUCKET_NAME_PREFIX}{os.getenv("UP_PLATFORM_ENV")}/staging',
>> 'runner': 'DataflowRunner',
>> 'save_main_session': True,
>> 'service_account_email': os.environ.get(SERVICE_ACCOUNT),
>> # 'network': f'projects/{gcp_project_id}/global/networks/default',
>> 'subnetwork': os.environ.get(SUBNETWORK_URL),
>> 'setup_file': setup_file_path,
>> 'extra_package': uplight_telemetry_tar_file_path
>> # 'template_location': 
>> 'gcr.io/dataflow-templates-base/python310-template-launcher-base'
>> }
>> print("Pipeline created for job-name", job_name)
>> logger.debug(f"pipeline_options created as {pipeline_options}")
>> return pipeline_options
>>
>> Why is it not trying to install this package from extra_package?
>>
>


Re: Streaming management exception in the sink target.

2023-12-05 Thread Robert Bradshaw via user
Currently error handling is implemented on sinks in an ad-hoc basis
(if at all) but John (cc'd) is looking at improving things here.

On Mon, Dec 4, 2023 at 10:25 AM Juan Romero  wrote:
>
> Hi guys. I want to ask you about how to deal with the scenario when the 
> target sink (eg: jdbc, kafka, bigquery, pubsub etc) fails for any reason and 
> i don't want to lost the message and create a bottleneck with many errors due 
> an hypothetical target sink problem,  and i want to use 
> with_excpetion_handling in order to get the message that failing to reach the 
> target and send the message to an other error topic. Any idea to solve this 
> scenario?


Re: [QUESTION] Why no auto labels?

2023-10-20 Thread Robert Bradshaw via user
>>> problem rather than silently give corrupt data.
>>>>>
>>>>>
>>>>> On Fri, Oct 13, 2023 at 7:15 AM Joey Tran  
>>>>> wrote:
>>>>>>
>>>>>> For posterity: https://github.com/apache/beam/pull/28984
>>>>>>
>>>>>> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw  
>>>>>> wrote:
>>>>>>>
>>>>>>> I would definitely support a PR making this an option. Changing the 
>>>>>>> default would be a rather big change that would require more thought.
>>>>>>>
>>>>>>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran  
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt 
>>>>>>>> Apache Beam at my company and I'm trying to foresee parts of the API 
>>>>>>>> they might find inconvenient.
>>>>>>>>
>>>>>>>> If there's a conclusion to make the behavior similar to java, I'm 
>>>>>>>> happy to put up a PR
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran  
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd 
>>>>>>>>> be a very sticky toggle since it'd be easy for PTransforms to 
>>>>>>>>> accidentally rely on it.
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw  
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Huh. This used to be a hard error in Java, but I guess it's 
>>>>>>>>>> togglable with an option now. We should probably add the option to 
>>>>>>>>>> toggle Python too. (Unclear what the default should be, but this 
>>>>>>>>>> probably ties into re-thinking how pipeline update should work.)
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran  
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Makes sense that the requirement is the same, but is the label 
>>>>>>>>>>> auto-generation behavior the same? I modified the BeamJava 
>>>>>>>>>>> wordcount example[1] to do the regex filter twice in a row, and 
>>>>>>>>>>> unlike the BeamPython example I posted before, it just warns 
>>>>>>>>>>> instead of throwing an exception.
>>>>>>>>>>>
>>>>>>>>>>> Tangentially, is it expected that the Beam playground examples 
>>>>>>>>>>> don't have a way to see the outputs of a run example? I have a 
>>>>>>>>>>> vague memory that there used to be a way to navigate to an output 
>>>>>>>>>>> file after it's generated but not sure if I just dreamt that up. 
>>>>>>>>>>> Playing with the examples, I wasn't positive if my runs were 
>>>>>>>>>>> actually succeeding or not based on the stdout alone.
>>>>>>>>>>>
>>>>>>>>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>>>>>>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user 
>>>>>>>>>>>  wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> BeamJava and BeamPython have the exact same behavior: transform 
>>>>>>>>>>>> names within must be distinct [1]. This is because we do not 
>>>>>>>>>>>> necessarily know at pipeline construction time if the pipeline 
>>>>>>>>>>>> will be streaming or batch, or if it will be updated in the 
>>>>>>>>>>>> future, so the decision was made to impose this restriction up 
>>>>>>>>>>>> front.

Re: Advanced Composite Transform Documentation

2023-10-19 Thread Robert Bradshaw via user
On Thu, Oct 19, 2023 at 2:00 PM Joey Tran  wrote:
>
> For the python SDK, is there somewhere where we document more "advance" 
> composite transform operations?

I'm not sure, but
https://beam.apache.org/documentation/programming-guide/ is the
canonical palace information like this should probaby be. Maybe this
users list will serve as a searchable resource at least. (Stack
overflow can be good sometimes as well.)

> e.g. I've been stumbling with questions like "How do I use a transform that 
> expects a PBegin in a composite transform",

As you mentioned, you do "pipeline | Transform," and you can get the
pipeline object from any PCollection you have in hand.

> "What's the proper way to return multiple output pcollections?",

You can return them as a(n ordinary) tuple or a dict (with string
keys). This is best expressed with the typescript implementation
(https://github.com/apache/beam/blob/master/sdks/typescript/src/apache_beam/pvalue.ts#L172
) but works for Python too.

> "What's the proper way to typehint multiple output pcollections?"

Typehinting for multiple outputs is still a work in progress, but I
would just add standard Python typehints to the expand method (which
is where we'd pick them up).

> ChatGPT helped me figure out the first question (use `pcoll.pipeline`), the 
> second question I guessed and the third question I'm still unsure about.
>
> Tried looking for these answers in the documentation but might just be 
> missing it.
>
> Best,
> Joey


Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via user
>>>> wrote:
>>>>>>>
>>>>>>>> Makes sense that the requirement is the same, but is the label
>>>>>>>> auto-generation behavior the same? I modified the BeamJava
>>>>>>>> wordcount example[1] to do the regex filter twice in a row, and unlike 
>>>>>>>> the
>>>>>>>> BeamPython example I posted before, it just warns instead of throwing 
>>>>>>>> an
>>>>>>>> exception.
>>>>>>>>
>>>>>>>> Tangentially, is it expected that the Beam playground examples
>>>>>>>> don't have a way to see the outputs of a run example? I have a vague 
>>>>>>>> memory
>>>>>>>> that there used to be a way to navigate to an output file after it's
>>>>>>>> generated but not sure if I just dreamt that up. Playing with the 
>>>>>>>> examples,
>>>>>>>> I wasn't positive if my runs were actually succeeding or not based on 
>>>>>>>> the
>>>>>>>> stdout alone.
>>>>>>>>
>>>>>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>>>>>> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
>>>>>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>>>>>
>>>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> BeamJava and BeamPython have the exact same behavior:
>>>>>>>>> transform names within must be distinct [1]. This is because we do not
>>>>>>>>> necessarily know at pipeline construction time if the pipeline will be
>>>>>>>>> streaming or batch, or if it will be updated in the future, so the 
>>>>>>>>> decision
>>>>>>>>> was made to impose this restriction up front. Both will auto-generate 
>>>>>>>>> a
>>>>>>>>> name for you if one is not given, but will do so deterministically 
>>>>>>>>> (not
>>>>>>>>> depending on some global context) to avoid potential update problems.
>>>>>>>>>
>>>>>>>>> [1] Note that this applies to the fully qualified transform name,
>>>>>>>>> so the naming only has to be distinct within a composite transform 
>>>>>>>>> (or at
>>>>>>>>> the top level--the pipeline itself is isomorphic to a single composite
>>>>>>>>> transform).
>>>>>>>>>
>>>>>>>>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran <
>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>
>>>>>>>>>> Cross posting this thread to dev@ to see if this is intentional
>>>>>>>>>> behavior or if it's something worth changing for the python SDK
>>>>>>>>>>
>>>>>>>>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> That suggests the default label is created as that, which indeed
>>>>>>>>>>> causes the duplication error.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran <
>>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Not sure what that suggests
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>>>>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>>>>>>>>> `Filter(identity_filter)`.
>>>>>>>>>>>>

Re: [QUESTION] Why no auto labels?

2023-10-13 Thread Robert Bradshaw via user
Thanks for the PR.

I think we should follow Java and allow non-unique labels, but not provide
automatic uniquification, In particular, the danger of using a counter is
that one can get accidental (and potentially hard to check) off-by-one
collisions. As a concrete example, imagine one partitions a dataset into
two collections, each followed by a similarly-named transform.

--> B
  /
A
 \
   --> B

Uniquification would give something like

--> B
  /
A
 \
   --> B_2

Suppose one then realizes there's a third case to handle, giving

--> B
  /
A --> B
 \
   --> B

But this would be uniquified to

--> B
  /
A --> B_2
 \
   --> B_3

where the old B_2 got renamed to B_3 and a new B_2 got put in its place.
This is bad because an updating runner would then attribute old B_2's state
to the new B_2 (and also possibly mis-direct any inflight messages). At
least with the old, intersecting names we can detect this problem
rather than silently give corrupt data.


On Fri, Oct 13, 2023 at 7:15 AM Joey Tran  wrote:

> For posterity: https://github.com/apache/beam/pull/28984
>
> On Tue, Oct 10, 2023 at 7:29 PM Robert Bradshaw 
> wrote:
>
>> I would definitely support a PR making this an option. Changing the
>> default would be a rather big change that would require more thought.
>>
>> On Tue, Oct 10, 2023 at 4:24 PM Joey Tran 
>> wrote:
>>
>>> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
>>> Apache Beam at my company and I'm trying to foresee parts of the API they
>>> might find inconvenient.
>>>
>>> If there's a conclusion to make the behavior similar to java, I'm happy
>>> to put up a PR
>>>
>>> On Thu, Oct 5, 2023, 12:49 PM Joey Tran 
>>> wrote:
>>>
>>>> Is it really toggleable in Java? I imagine that if it's a toggle it'd
>>>> be a very sticky toggle since it'd be easy for PTransforms to accidentally
>>>> rely on it.
>>>>
>>>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>>>> with an option now. We should probably add the option to toggle Python 
>>>>> too.
>>>>> (Unclear what the default should be, but this probably ties into
>>>>> re-thinking how pipeline update should work.)
>>>>>
>>>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Makes sense that the requirement is the same, but is the label
>>>>>> auto-generation behavior the same? I modified the BeamJava
>>>>>> wordcount example[1] to do the regex filter twice in a row, and unlike 
>>>>>> the
>>>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>>>> exception.
>>>>>>
>>>>>> Tangentially, is it expected that the Beam playground examples don't
>>>>>> have a way to see the outputs of a run example? I have a vague memory 
>>>>>> that
>>>>>> there used to be a way to navigate to an output file after it's generated
>>>>>> but not sure if I just dreamt that up. Playing with the examples, I 
>>>>>> wasn't
>>>>>> positive if my runs were actually succeeding or not based on the stdout
>>>>>> alone.
>>>>>>
>>>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>>>> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
>>>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>>>
>>>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> BeamJava and BeamPython have the exact same behavior:
>>>>>>> transform names within must be distinct [1]. This is because we do not
>>>>>>> necessarily know at pipeline construction time if the pipeline will be
>>>>>>> streaming or batch, or if it will be updated in the future, so the 
>>>>>>> decision
>>>>>>> was made to impose this restriction up front. Both will auto-generate a
>>>>>>> name for you if one is not given, but will do so deterministically (not
>>>>>>> depending on some global context

Re: [QUESTION] Why no auto labels?

2023-10-10 Thread Robert Bradshaw via user
I would definitely support a PR making this an option. Changing the default
would be a rather big change that would require more thought.

On Tue, Oct 10, 2023 at 4:24 PM Joey Tran  wrote:

> Bump on this. Sorry to pester - I'm trying to get a few teams to adopt
> Apache Beam at my company and I'm trying to foresee parts of the API they
> might find inconvenient.
>
> If there's a conclusion to make the behavior similar to java, I'm happy to
> put up a PR
>
> On Thu, Oct 5, 2023, 12:49 PM Joey Tran  wrote:
>
>> Is it really toggleable in Java? I imagine that if it's a toggle it'd be
>> a very sticky toggle since it'd be easy for PTransforms to accidentally
>> rely on it.
>>
>> On Thu, Oct 5, 2023 at 12:33 PM Robert Bradshaw 
>> wrote:
>>
>>> Huh. This used to be a hard error in Java, but I guess it's togglable
>>> with an option now. We should probably add the option to toggle Python too.
>>> (Unclear what the default should be, but this probably ties into
>>> re-thinking how pipeline update should work.)
>>>
>>> On Thu, Oct 5, 2023 at 4:58 AM Joey Tran 
>>> wrote:
>>>
>>>> Makes sense that the requirement is the same, but is the label
>>>> auto-generation behavior the same? I modified the BeamJava
>>>> wordcount example[1] to do the regex filter twice in a row, and unlike the
>>>> BeamPython example I posted before, it just warns instead of throwing an
>>>> exception.
>>>>
>>>> Tangentially, is it expected that the Beam playground examples don't
>>>> have a way to see the outputs of a run example? I have a vague memory that
>>>> there used to be a way to navigate to an output file after it's generated
>>>> but not sure if I just dreamt that up. Playing with the examples, I wasn't
>>>> positive if my runs were actually succeeding or not based on the stdout
>>>> alone.
>>>>
>>>> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
>>>> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
>>>> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>>>>
>>>> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> BeamJava and BeamPython have the exact same behavior: transform names
>>>>> within must be distinct [1]. This is because we do not necessarily know at
>>>>> pipeline construction time if the pipeline will be streaming or batch, or
>>>>> if it will be updated in the future, so the decision was made to impose
>>>>> this restriction up front. Both will auto-generate a name for you if one 
>>>>> is
>>>>> not given, but will do so deterministically (not depending on some global
>>>>> context) to avoid potential update problems.
>>>>>
>>>>> [1] Note that this applies to the fully qualified transform name, so
>>>>> the naming only has to be distinct within a composite transform (or at the
>>>>> top level--the pipeline itself is isomorphic to a single composite
>>>>> transform).
>>>>>
>>>>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Cross posting this thread to dev@ to see if this is intentional
>>>>>> behavior or if it's something worth changing for the python SDK
>>>>>>
>>>>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>>>>> wrote:
>>>>>>
>>>>>>> That suggests the default label is created as that, which indeed
>>>>>>> causes the duplication error.
>>>>>>>
>>>>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure what that suggests
>>>>>>>>
>>>>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>>>>> `Filter(identity_filter)`.
>>>>>>>>>
>>>>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran <
>>>>>>>>

Re: [QUESTION] Why no auto labels?

2023-10-05 Thread Robert Bradshaw via user
Huh. This used to be a hard error in Java, but I guess it's togglable
with an option now. We should probably add the option to toggle Python too.
(Unclear what the default should be, but this probably ties into
re-thinking how pipeline update should work.)

On Thu, Oct 5, 2023 at 4:58 AM Joey Tran  wrote:

> Makes sense that the requirement is the same, but is the label
> auto-generation behavior the same? I modified the BeamJava
> wordcount example[1] to do the regex filter twice in a row, and unlike the
> BeamPython example I posted before, it just warns instead of throwing an
> exception.
>
> Tangentially, is it expected that the Beam playground examples don't have
> a way to see the outputs of a run example? I have a vague memory that there
> used to be a way to navigate to an output file after it's generated but not
> sure if I just dreamt that up. Playing with the examples, I wasn't positive
> if my runs were actually succeeding or not based on the stdout alone.
>
> [1] https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2
> <https://play.beam.apache.org/?sdk=java&shared=mI7WUeje_r2>
> [2] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
>
> On Wed, Oct 4, 2023 at 12:16 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> BeamJava and BeamPython have the exact same behavior: transform names
>> within must be distinct [1]. This is because we do not necessarily know at
>> pipeline construction time if the pipeline will be streaming or batch, or
>> if it will be updated in the future, so the decision was made to impose
>> this restriction up front. Both will auto-generate a name for you if one is
>> not given, but will do so deterministically (not depending on some global
>> context) to avoid potential update problems.
>>
>> [1] Note that this applies to the fully qualified transform name, so the
>> naming only has to be distinct within a composite transform (or at the top
>> level--the pipeline itself is isomorphic to a single composite transform).
>>
>> On Wed, Oct 4, 2023 at 3:43 AM Joey Tran 
>> wrote:
>>
>>> Cross posting this thread to dev@ to see if this is intentional
>>> behavior or if it's something worth changing for the python SDK
>>>
>>> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user 
>>> wrote:
>>>
>>>> That suggests the default label is created as that, which indeed causes
>>>> the duplication error.
>>>>
>>>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>>>> wrote:
>>>>
>>>>> Not sure what that suggests
>>>>>
>>>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>>>> wrote:
>>>>>
>>>>>> Looks like this is the current behaviour. If you have `t =
>>>>>> beam.Filter(identity_filter)`, `t.label` is defined as
>>>>>> `Filter(identity_filter)`.
>>>>>>
>>>>>> On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
>>>>>> wrote:
>>>>>>
>>>>>>> You don't have to specify the names if the callable you pass in is
>>>>>>> /different/ for two `beam.Map`s, but  if the callable is the same you 
>>>>>>> must
>>>>>>> specify a label. For example, the below will raise an exception:
>>>>>>>
>>>>>>> ```
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> | beam.Filter(identity_filter)
>>>>>>> ```
>>>>>>>
>>>>>>> Here's an example on playground that shows the error message you get
>>>>>>> [1]. I marked every line I added with a "# ++".
>>>>>>>
>>>>>>> It's a contrived example, but using a map or filter at the same
>>>>>>> pipeline level probably comes up often, at least in my inexperience. For
>>>>>>> example, you. might have a pipeline that partitions a pcoll into three
>>>>>>> different pcolls, runs some transforms on them, and then runs the same 
>>>>>>> type
>>>>>>> of filter on each of them.
>>>>>>>
>>>>>>> The case that happens most often for me is using the `assert_that`
>>>>>>> [2] testing transform. In this case, I think often users will really 
>>>>>>> have
>>>>>>> no need for a disambiguating label as they're often just writing unit 
>

Re: [QUESTION] Why no auto labels?

2023-10-04 Thread Robert Bradshaw via user
BeamJava and BeamPython have the exact same behavior: transform names
within must be distinct [1]. This is because we do not necessarily know at
pipeline construction time if the pipeline will be streaming or batch, or
if it will be updated in the future, so the decision was made to impose
this restriction up front. Both will auto-generate a name for you if one is
not given, but will do so deterministically (not depending on some global
context) to avoid potential update problems.

[1] Note that this applies to the fully qualified transform name, so the
naming only has to be distinct within a composite transform (or at the top
level--the pipeline itself is isomorphic to a single composite transform).

On Wed, Oct 4, 2023 at 3:43 AM Joey Tran  wrote:

> Cross posting this thread to dev@ to see if this is intentional behavior
> or if it's something worth changing for the python SDK
>
> On Tue, Oct 3, 2023, 10:10 PM XQ Hu via user  wrote:
>
>> That suggests the default label is created as that, which indeed causes
>> the duplication error.
>>
>> On Tue, Oct 3, 2023 at 9:15 PM Joey Tran 
>> wrote:
>>
>>> Not sure what that suggests
>>>
>>> On Tue, Oct 3, 2023, 6:24 PM XQ Hu via user 
>>> wrote:
>>>
 Looks like this is the current behaviour. If you have `t =
 beam.Filter(identity_filter)`, `t.label` is defined as
 `Filter(identity_filter)`.

 On Mon, Oct 2, 2023 at 9:25 AM Joey Tran 
 wrote:

> You don't have to specify the names if the callable you pass in is
> /different/ for two `beam.Map`s, but  if the callable is the same you must
> specify a label. For example, the below will raise an exception:
>
> ```
> | beam.Filter(identity_filter)
> | beam.Filter(identity_filter)
> ```
>
> Here's an example on playground that shows the error message you get
> [1]. I marked every line I added with a "# ++".
>
> It's a contrived example, but using a map or filter at the same
> pipeline level probably comes up often, at least in my inexperience. For
> example, you. might have a pipeline that partitions a pcoll into three
> different pcolls, runs some transforms on them, and then runs the same 
> type
> of filter on each of them.
>
> The case that happens most often for me is using the `assert_that` [2]
> testing transform. In this case, I think often users will really have no
> need for a disambiguating label as they're often just writing unit tests
> that test a few different properties of their workflow.
>
> [1] https://play.beam.apache.org/?sdk=python&shared=hIrm7jvCamW
> [2]
> https://beam.apache.org/releases/pydoc/2.29.0/apache_beam.testing.util.html#apache_beam.testing.util.assert_that
>
> On Mon, Oct 2, 2023 at 9:08 AM Bruno Volpato via user <
> user@beam.apache.org> wrote:
>
>> If I understand the question correctly, you don't have to specify
>> those names.
>>
>> As Reuven pointed out, it is probably a good idea so you have a
>> stable / deterministic graph.
>> But in the Python SDK, you can simply use pcollection | map_fn,
>> instead of pcollection | 'Map' >> map_fn.
>>
>> See an example here
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/group_with_coder.py#L100-L116
>>
>>
>> On Sun, Oct 1, 2023 at 9:08 PM Joey Tran 
>> wrote:
>>
>>> Hmm, I'm not sure what you mean by "updating pipelines in place".
>>> Can you elaborate?
>>>
>>> I forgot to mention my question is posed from the context of a
>>> python SDK user, and afaict, there doesn't seem to be an obvious way to
>>> autogenerate names/labels. Hearing that the java SDK supports it makes 
>>> me
>>> wonder if the python SDK could support it as well though... (If so, I'd 
>>> be
>>> happy to do implement it). Currently, it's fairly tedious to have to 
>>> name
>>> every instance of a transform that you might reuse in a pipeline, e.g. 
>>> when
>>> reapplying the same Map on different pcollections.
>>>
>>> On Sun, Oct 1, 2023 at 8:12 PM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
 Are you talking about transform names? The main reason was because
 for runners that support updating pipelines in place, the only way to 
 do so
 safely is if the runner can perfectly identify which transforms in the 
 new
 graph match the ones in the old graph. There's no good way to auto 
 generate
 names that will stay stable across updates - even small changes to the
 pipeline might change the order of nodes in the graph, which could 
 result
 in a corrupted update.

 However, if you don't care about update, Beam can auto generate
 these names for you! When you call PCollection.apply (if using 
 BeamJava),
>

Re: UDF/UADF over complex structures

2023-09-28 Thread Robert Bradshaw via user
Yes, for sure. This is one of the areas Beam excels vs. more simple tools
like SQL. You can write arbitrary code to iterate over arbitrary structures
in the typical Java/Python/Go/Typescript/Scala/[pick your language] way. In
the Beam nomenclature. UDFs correspond to DoFns and UDAFs correspond to
CombineFns.

On Thu, Sep 28, 2023 at 4:23 AM Balogh, György  wrote:

> Hi,
> I've complex nested structure in my input data. Is it possible to have
> UDF/UDAF taking nested structure as input? I'm using java.
> Outputting nested structure is also a question.
> Thank you,
> Gyorgy
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>


Re: [Question] Side Input pattern

2023-09-15 Thread Robert Bradshaw via user
Yes, if you block that bundle will not progress. Generally streaming
pipelines are processing many (hundreds) of bundles in parallel (e.g. one
per key on dataflow), but at the source there may not be as much available
parallelism and it's better to return than wait if there are no elements
left ot read.

On Fri, Sep 15, 2023 at 10:42 AM Ruben Vargas 
wrote:

> Hello thanks for the reply
>
> I was digging into the UnboundedReader interface, and I observed that some
> implementations block the entire progress of the other inputs when they get
> blocked into the advance() method, (probably waiting if there are new
> elements or not), an example of this is the AWS SQSIO  implementation. if I
> return true or false immediately the progress of the main input continues,
> but If I wait for results on the advance() method, all of other inputs get
> blocked
>
>
> Is that assumption correct?
>
>
>
> El El vie, 15 de septiembre de 2023 a la(s) 10:59, Robert Bradshaw via
> user  escribió:
>
>> Beam will block on side inputs until at least one value is available (or
>> the watermark has advanced such that we can be sure one will never become
>> available, which doesn't really apply to the global window case).
>> After that, workers generally cache the side input value (for performance
>> reasons) but may periodically re-fetch it (the exact cadence probably
>> depends on the runner implementation).
>>
>> On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas 
>> wrote:
>>
>>> Hello Everyone
>>>
>>> I have a question, I have on my pipeline one side input that
>>> fetches some configurations from an API endpoint each 30 seconds, my
>>> question is this.
>>>
>>>
>>> I have something similar to what is showed in the side input patterns
>>> documentation
>>>
>>>  PCollectionView> map =
>>> p.apply(GenerateSequence.from(0).withRate(1,
>>> Duration.standardSeconds(5L)))
>>> .apply(
>>> ParDo.of(
>>> new DoFn>() {
>>>
>>>   @ProcessElement
>>>   public void process(
>>>   @Element Long input,
>>>   @Timestamp Instant timestamp,
>>>   OutputReceiver> o) {
>>> call HTTP endpoint here!!
>>>   }
>>> }))
>>> .apply(
>>> Window.>into(new GlobalWindows())
>>>
>>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>>> .discardingFiredPanes())
>>> .apply(Latest.globally())
>>> .apply(View.asSingleton());
>>>
>>> What happens if for example the HTTP endpoint takes time to respond due
>>> some network issues and/or the amount of data. Is this gonna introduce
>>> delays on my main pipeline? Is the main pipeline blocked until the pardo
>>> that processes the side input ends?
>>>
>>> I don't care too much about the consistency here, I mean if the
>>> configuration changed in the Time T1 I don't care if some registries with
>>> T2 timestamp are processed with the configuration version of T1.
>>>
>>>
>>> Regards.
>>>
>>>


Re: [Question] Side Input pattern

2023-09-15 Thread Robert Bradshaw via user
Beam will block on side inputs until at least one value is available (or
the watermark has advanced such that we can be sure one will never become
available, which doesn't really apply to the global window case).
After that, workers generally cache the side input value (for performance
reasons) but may periodically re-fetch it (the exact cadence probably
depends on the runner implementation).

On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas 
wrote:

> Hello Everyone
>
> I have a question, I have on my pipeline one side input that fetches some
> configurations from an API endpoint each 30 seconds, my question is this.
>
>
> I have something similar to what is showed in the side input patterns
> documentation
>
>  PCollectionView> map =
> p.apply(GenerateSequence.from(0).withRate(1,
> Duration.standardSeconds(5L)))
> .apply(
> ParDo.of(
> new DoFn>() {
>
>   @ProcessElement
>   public void process(
>   @Element Long input,
>   @Timestamp Instant timestamp,
>   OutputReceiver> o) {
> call HTTP endpoint here!!
>   }
> }))
> .apply(
> Window.>into(new GlobalWindows())
>
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
> .discardingFiredPanes())
> .apply(Latest.globally())
> .apply(View.asSingleton());
>
> What happens if for example the HTTP endpoint takes time to respond due
> some network issues and/or the amount of data. Is this gonna introduce
> delays on my main pipeline? Is the main pipeline blocked until the pardo
> that processes the side input ends?
>
> I don't care too much about the consistency here, I mean if the
> configuration changed in the Time T1 I don't care if some registries with
> T2 timestamp are processed with the configuration version of T1.
>
>
> Regards.
>
>


Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Robert Bradshaw via user
On Fri, Sep 15, 2023 at 9:46 AM Reuven Lax via user 
wrote:

> Creating composite DoFns is tricky today due to how they are implemented
> (via annotated methods).
>

Note that this depends on the language. This should be really easy to do
from Python.


> However providing such a method to compose DoFns would be very useful IMO.
>

+1


> On Fri, Sep 15, 2023 at 9:33 AM Joey Tran 
> wrote:
>
>> Yeah for (1) the concern would be adding a shuffle/fusion break and (2)
>> sounds like the likely solution, was just hoping there'd be one that could
>> wrap at the PTransform level but I realize now the PTransform abstraction
>> is too general as you mentioned to do something like that.
>>
>> (2) will be likely what we do, though now I'm wondering if it might be
>> possible to create a ParDo wrapper that can take a ParDo, extract it's
>> dofn, wrap it, and return a new ParDo
>>
>> On Fri, Sep 15, 2023, 11:53 AM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>> +1 to looking at composite transforms. You could even have a composite
>>> transform that takes another transform as one of its construction arguments
>>> and whose expand method does pre- and post-processing to the inputs/outputs
>>> before/after applying the transform in question. (You could even implement
>>> this as a Python decorator if you really wanted, either decorating the
>>> expand method itself or the full class...)
>>>
>>> One of the difficulties is that for a general transform there isn't
>>> necessarily a 1:N relationship between outputs and inputs as one has for a
>>> DoFn (especially if there is any aggregation involved). There are, however,
>>> two partial solutions that might help.
>>>
>>> (1) You can do a CombineGlobally with a CombineFn (Like Sample) that
>>> returns at most N elements. You could do this with a CombinePerKey if you
>>> can come up with a reasonable key (e.g. the id of your input elements) that
>>> the limit should be a applied to. Note that this may cause a lot of data to
>>> be shuffled (though due to combiner lifting, no more than N per bundle).
>>>
>>> (2) You could have a DoFn that limits to N per bundle by initializing a
>>> counter in its start_bundle and passing elements through until the counter
>>> reaches a threshold. (Again, one could do this per id if one is available.)
>>> It wouldn't stop production of the elements, but if things get fused it
>>> would still likely be fairly cheap.
>>>
>>> Both of these could be prepended to the problematic consuming PTransform
>>> as well.
>>>
>>> - Robert
>>>
>>>
>>>
>>> On Fri, Sep 15, 2023 at 8:13 AM Joey Tran 
>>> wrote:
>>>
>>>> I'm aware of composite transforms and of the distributed nature of
>>>> PTransforms. I'm not suggesting limiting the entire set and my example was
>>>> more illustrative than the actual use case.
>>>>
>>>> My actual use case is basically: I have multiple PTransforms, and let's
>>>> say most of them average ~100 generated outputs for a single input. Most of
>>>> these PTransforms will occasionally run into an input though that might
>>>> output maybe 1M outputs. This can cause issues if for example there are
>>>> transforms that follow it that require a lot of compute per input.
>>>>
>>>> The simplest way to deal with this is to modify the `DoFn`s in our
>>>> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
>>>> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
>>>> our transforms, but it'd be much cleaner if we could lift up this limiting
>>>> logic out of the application logic and have some generic wrapper that
>>>> extends our transforms.
>>>>
>>>> Thanks for the discussion!
>>>>
>>>> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> I don’t think it’s possible to extend in a way that you are asking
>>>>> (like, Java classes “*extend*"). Though, you can create your own
>>>>> composite PTransform that will incorporate one or several others inside
>>>>> *“expand()”* method. Actually, most of the Beam native PTransforms
>>>>> are composite transforms. Please, take a look on doc and examples [1]
>>>>>
>>>>&

Re: "Decorator" pattern for PTramsforms

2023-09-15 Thread Robert Bradshaw via user
+1 to looking at composite transforms. You could even have a composite
transform that takes another transform as one of its construction arguments
and whose expand method does pre- and post-processing to the inputs/outputs
before/after applying the transform in question. (You could even implement
this as a Python decorator if you really wanted, either decorating the
expand method itself or the full class...)

One of the difficulties is that for a general transform there isn't
necessarily a 1:N relationship between outputs and inputs as one has for a
DoFn (especially if there is any aggregation involved). There are, however,
two partial solutions that might help.

(1) You can do a CombineGlobally with a CombineFn (Like Sample) that
returns at most N elements. You could do this with a CombinePerKey if you
can come up with a reasonable key (e.g. the id of your input elements) that
the limit should be a applied to. Note that this may cause a lot of data to
be shuffled (though due to combiner lifting, no more than N per bundle).

(2) You could have a DoFn that limits to N per bundle by initializing a
counter in its start_bundle and passing elements through until the counter
reaches a threshold. (Again, one could do this per id if one is available.)
It wouldn't stop production of the elements, but if things get fused it
would still likely be fairly cheap.

Both of these could be prepended to the problematic consuming PTransform as
well.

- Robert



On Fri, Sep 15, 2023 at 8:13 AM Joey Tran  wrote:

> I'm aware of composite transforms and of the distributed nature of
> PTransforms. I'm not suggesting limiting the entire set and my example was
> more illustrative than the actual use case.
>
> My actual use case is basically: I have multiple PTransforms, and let's
> say most of them average ~100 generated outputs for a single input. Most of
> these PTransforms will occasionally run into an input though that might
> output maybe 1M outputs. This can cause issues if for example there are
> transforms that follow it that require a lot of compute per input.
>
> The simplest way to deal with this is to modify the `DoFn`s in our
> Ptransforms and add a limiter in the logic (e.g. `if num_outputs_generated
> >= OUTPUTS_PER_INPUT_LIMIT: return`). We could duplicate this logic across
> our transforms, but it'd be much cleaner if we could lift up this limiting
> logic out of the application logic and have some generic wrapper that
> extends our transforms.
>
> Thanks for the discussion!
>
> On Fri, Sep 15, 2023 at 10:29 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> I don’t think it’s possible to extend in a way that you are asking (like,
>> Java classes “*extend*"). Though, you can create your own composite
>> PTransform that will incorporate one or several others inside
>> *“expand()”* method. Actually, most of the Beam native PTransforms are
>> composite transforms. Please, take a look on doc and examples [1]
>>
>> Regarding your example, please, be aware that all PTransforms are
>> supposed to be executed in distributed environment and the order of records
>> is not guaranteed. So, limiting the whole output by fixed number of records
>> can be challenging - you’d need to make sure that it will be processed on
>> only one worker, that means that you’d need to shuffle all your records by
>> the same key and probably sort the records in way that you need.
>>
>> Did you consider to use “*org.apache.beam.sdk.transforms.Top*” for that?
>> [2]
>>
>> If it doesn’t work for you, could you provide more details of your use
>> case? Then we probably can propose the more suitable solutions for that.
>>
>> [1]
>> https://beam.apache.org/documentation/programming-guide/#composite-transforms
>> [2]
>> https://beam.apache.org/releases/javadoc/2.50.0/org/apache/beam/sdk/transforms/Top.html
>>
>> —
>> Alexey
>>
>> On 15 Sep 2023, at 14:22, Joey Tran  wrote:
>>
>> Is there a way to extend already defined PTransforms? My question is
>> probably better illustrated with an example. Let's say I have a PTransform
>> that generates a very variable number of outputs. I'd like to "wrap" that
>> PTransform such that if it ever creates more than say 1,000 outputs, then I
>> just take the first 1,000 outputs without generating the rest of the
>> outputs.
>>
>> It'd be trivial if I have access to the DoFn, but what if the PTransform
>> in question doesn't expose the `DoFn`?
>>
>>
>>


Re: Options for visualizing the pipeline DAG

2023-09-01 Thread Robert Bradshaw via user
(As an aside, I think all of these options would make for a great blog post
if anyone is interested in authoring one of those...)

On Fri, Sep 1, 2023 at 9:26 AM Robert Bradshaw  wrote:

> You can also use Python's RenderRunner, e.g.
>
>   python -m apache_beam.examples.wordcount --output out.txt \
> --runner=apache_beam.runners.render.RenderRunner \
> --render_output=pipeline.svg
>
> This also has an interactive mode, triggered by passing --port=N (where 0
> can be used to pick an unused port) which vends the graph as a local web
> service. This allows one to expand/collapse composites for easier
> exploration. Any --render_output arguments that are passed will get
> re-rendered as you edit the graph. (It uses graphviz under the hood, so can
> render any of those supported formats.)
>
> For rendering non-Python pipelines, one can start this up as a local
> portable "runner"
>
>   python -m apache_beam.runners.render
>
> and then "submit" this job from your other SDK over the jobs API to view
> it.
>
> [image: pipeline.png]
>
>
>
> On Fri, Sep 1, 2023 at 7:13 AM Joey Tran 
> wrote:
>
>> Perfect, `pipeline_graph` python module in the stack overflow post [1]
>> was exactly what I was looking for. The dependencies I'm working with are a
>> bit heavyweight and likely difficult to install into a notebook, so I was
>> looking for something I could do on my local machine.
>>
>> Thanks!
>> Joey
>>
>> [1] -
>> https://stackoverflow.com/questions/72592971/way-to-visualize-beam-pipeline-run-with-directrunner
>>
>> On Fri, Sep 1, 2023 at 8:40 AM Danny McCormick via user <
>> user@beam.apache.org> wrote:
>>
>>> Hey Joey,
>>>
>>> Dataflow and Beam playground are 2 options as you mentioned, locally
>>> many SDKs have local runner options with a visual component. For example,
>>> in Python you can use the interactive runner with the
>>> apache-beam-jupyterlab-sidepanel extension
>>> 
>>> to view pipelines visually locally (this is similar to what the notebooks
>>> you reference are doing). You can also just call some of these pieces
>>> directly
>>> 
>>> without an extension. Go has a dot runner
>>> 
>>> that produces a visual representation of a pipeline. Java has a similar dot
>>> renderer .
>>>
>>> Thanks,
>>> Danny
>>>
>>> On Thu, Aug 31, 2023 at 6:38 PM Joey Tran 
>>> wrote:
>>>
 Hi all,

 What're all the current options for visualizing a pipeline? I'm
 guessing Dataflow has a visualization. I saw that there are also Apache
 Beam notebooks through GCP, and I'm aware of the Beam playground, but is
 there an easy way to create and view the visualization locally? For
 example, I might have a large codebase that's used to construct and run a
 pipeline, and in this case I don't think any of those three solutions would
 be very easy to use to visualize my pipeline (though I could be wrong)

 Best,
 Joey

 --

 Joey Tran | Senior Developer Il | AutoDesigner TL

 *he/him*

 [image: Schrödinger, Inc.] 

>>>


Re: Options for visualizing the pipeline DAG

2023-09-01 Thread Robert Bradshaw via user
You can also use Python's RenderRunner, e.g.

  python -m apache_beam.examples.wordcount --output out.txt \
--runner=apache_beam.runners.render.RenderRunner \
--render_output=pipeline.svg

This also has an interactive mode, triggered by passing --port=N (where 0
can be used to pick an unused port) which vends the graph as a local web
service. This allows one to expand/collapse composites for easier
exploration. Any --render_output arguments that are passed will get
re-rendered as you edit the graph. (It uses graphviz under the hood, so can
render any of those supported formats.)

For rendering non-Python pipelines, one can start this up as a local
portable "runner"

  python -m apache_beam.runners.render

and then "submit" this job from your other SDK over the jobs API to view
it.

[image: pipeline.png]



On Fri, Sep 1, 2023 at 7:13 AM Joey Tran  wrote:

> Perfect, `pipeline_graph` python module in the stack overflow post [1] was
> exactly what I was looking for. The dependencies I'm working with are a bit
> heavyweight and likely difficult to install into a notebook, so I was
> looking for something I could do on my local machine.
>
> Thanks!
> Joey
>
> [1] -
> https://stackoverflow.com/questions/72592971/way-to-visualize-beam-pipeline-run-with-directrunner
>
> On Fri, Sep 1, 2023 at 8:40 AM Danny McCormick via user <
> user@beam.apache.org> wrote:
>
>> Hey Joey,
>>
>> Dataflow and Beam playground are 2 options as you mentioned, locally many
>> SDKs have local runner options with a visual component. For example, in
>> Python you can use the interactive runner with the
>> apache-beam-jupyterlab-sidepanel extension
>> 
>> to view pipelines visually locally (this is similar to what the notebooks
>> you reference are doing). You can also just call some of these pieces
>> directly
>> 
>> without an extension. Go has a dot runner
>> 
>> that produces a visual representation of a pipeline. Java has a similar dot
>> renderer .
>>
>> Thanks,
>> Danny
>>
>> On Thu, Aug 31, 2023 at 6:38 PM Joey Tran 
>> wrote:
>>
>>> Hi all,
>>>
>>> What're all the current options for visualizing a pipeline? I'm guessing
>>> Dataflow has a visualization. I saw that there are also Apache Beam
>>> notebooks through GCP, and I'm aware of the Beam playground, but is there
>>> an easy way to create and view the visualization locally? For example, I
>>> might have a large codebase that's used to construct and run a pipeline,
>>> and in this case I don't think any of those three solutions would be very
>>> easy to use to visualize my pipeline (though I could be wrong)
>>>
>>> Best,
>>> Joey
>>>
>>> --
>>>
>>> Joey Tran | Senior Developer Il | AutoDesigner TL
>>>
>>> *he/him*
>>>
>>> [image: Schrödinger, Inc.] 
>>>
>>


Re: [Request for Feedback] Swift SDK Prototype

2023-08-24 Thread Robert Bradshaw via user
On Thu, Aug 24, 2023 at 12:58 PM Chamikara Jayalath 
wrote:

>
>
> On Thu, Aug 24, 2023 at 12:27 PM Robert Bradshaw 
> wrote:
>
>> I would like to figure out a way to get the stream-y interface to work,
>> as I think it's more natural overall.
>>
>> One hypothesis is that if any elements are carried over loop iterations,
>> there will likely be some that are carried over beyond the loop (after all
>> the callee doesn't know when the loop is supposed to end). We could reject
>> "plain" elements that are emitted after this point, requiring one to emit
>> timestamp-windowed-values.
>>
>
> Are you assuming that the same stream (or overlapping sets of data) are
> pushed to multiple workers ? I thought that the set of data streamed here
> are the data that belong to the current bundle (hence already assigned to
> the current worker) so any output from the current bundle invocation would
> be a valid output of that bundle.
>
>>
Yes, the content of the stream is exactly the contents of the bundle. The
question is how to do the input_element:output_element correlation for
automatically propagating metadata.


> Related to this, we could enforce that the only (user-accessible) way to
>> get such a timestamped value is to start with one, e.g. a
>> WindowedValue.withValue(O) produces a WindowedValue with the same
>> metadata but a new value. Thus a user wanting to do anything "fancy" would
>> have to explicitly request iteration over these windowed values rather than
>> over the raw elements. (This is also forward compatible with expanding the
>> metadata that can get attached, e.g. pane infos, and makes the right thing
>> the easiest/most natural.)
>>
>> On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis 
>> wrote:
>>
>>> Ah, that is a good point—being element-wise would make managing windows
>>> and time stamps easier for the user. Fortunately it’s a fairly easy change
>>> to make and maybe even less typing for the user. I was originally thinking
>>> side inputs and metrics would happen outside the loop, but I think you want
>>> a class and not a closure at that point for sanity.
>>>
>>> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw 
>>> wrote:
>>>
 Ah, I see.

 Yeah, I've thought about using an iterable for the whole bundle rather
 than start/finish bundle callbacks, but one of the questions is how that
 would impact implicit passing of the timestamp (and other) metadata from
 input elements to output elements. (You can of course attach the metadata
 to any output that happens in the loop body, but it's very easy to
 implicitly to break the 1:1 relationship here (e.g. by doing buffering or
 otherwise modifying local state) and this would be hard to detect. (I
 suppose trying to output after the loop finishes could require
 something more explicit).


 On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis 
 wrote:

> Oh, I also forgot to mention that I included element-wise collection
> operations like "map" that eliminate the need for pardo in many cases. the
> groupBy command is actually a map + groupByKey under the hood. That was to
> be more consistent with Swift's collection protocol (and is also why
> PCollection and PCollectionStream are different types... PCollection
> implements map and friends as pipeline construction operations whereas
> PCollectionStream is an actual stream)
>
> I just happened to push some "IO primitives" that uses map rather than
> pardo in a couple of places to do a true wordcount using good ol'
> Shakespeare and very very primitive GCS IO.
>
> Best,
> B
>
> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis 
> wrote:
>
>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a
>> bit before settling on where I ended up. Ultimately I decided to go with
>> something that felt more Swift-y than anything else which means that 
>> rather
>> than dealing with a single element like you do in the other SDKs you're
>> dealing with a stream of elements (which of course will often be of size
>> 1). That's a really natural paradigm in the Swift world especially with 
>> the
>> async / await structures. So when you see something like:
>>
>> pardo(name:"Read Files") { filenames,output,errors in
>>
>> for try await (filename,_,_) in filenames {
>>   ...
>>   output.emit(data)
>>
>> }
>>
>> filenames is the input stream and then output and errors are both
>> output streams. In theory you can have as many output streams as you like
>> though at the moment there's a compiler bug in the new type pack feature
>> that limits it to "as many as I felt like supporting". Presumably this 
>> will
>> get fixed before the official 5.9 release which will probably be in the
>> October timeframe if history is any guide)
>>
>> If you had parameterization you wanted to send that would 

Re: [Request for Feedback] Swift SDK Prototype

2023-08-24 Thread Robert Bradshaw via user
I would like to figure out a way to get the stream-y interface to work, as
I think it's more natural overall.

One hypothesis is that if any elements are carried over loop iterations,
there will likely be some that are carried over beyond the loop (after all
the callee doesn't know when the loop is supposed to end). We could reject
"plain" elements that are emitted after this point, requiring one to emit
timestamp-windowed-values.

Related to this, we could enforce that the only (user-accessible) way to
get such a timestamped value is to start with one, e.g. a
WindowedValue.withValue(O) produces a WindowedValue with the same
metadata but a new value. Thus a user wanting to do anything "fancy" would
have to explicitly request iteration over these windowed values rather than
over the raw elements. (This is also forward compatible with expanding the
metadata that can get attached, e.g. pane infos, and makes the right thing
the easiest/most natural.)

On Thu, Aug 24, 2023 at 12:10 PM Byron Ellis  wrote:

> Ah, that is a good point—being element-wise would make managing windows
> and time stamps easier for the user. Fortunately it’s a fairly easy change
> to make and maybe even less typing for the user. I was originally thinking
> side inputs and metrics would happen outside the loop, but I think you want
> a class and not a closure at that point for sanity.
>
> On Thu, Aug 24, 2023 at 12:02 PM Robert Bradshaw 
> wrote:
>
>> Ah, I see.
>>
>> Yeah, I've thought about using an iterable for the whole bundle rather
>> than start/finish bundle callbacks, but one of the questions is how that
>> would impact implicit passing of the timestamp (and other) metadata from
>> input elements to output elements. (You can of course attach the metadata
>> to any output that happens in the loop body, but it's very easy to
>> implicitly to break the 1:1 relationship here (e.g. by doing buffering or
>> otherwise modifying local state) and this would be hard to detect. (I
>> suppose trying to output after the loop finishes could require
>> something more explicit).
>>
>>
>> On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis 
>> wrote:
>>
>>> Oh, I also forgot to mention that I included element-wise collection
>>> operations like "map" that eliminate the need for pardo in many cases. the
>>> groupBy command is actually a map + groupByKey under the hood. That was to
>>> be more consistent with Swift's collection protocol (and is also why
>>> PCollection and PCollectionStream are different types... PCollection
>>> implements map and friends as pipeline construction operations whereas
>>> PCollectionStream is an actual stream)
>>>
>>> I just happened to push some "IO primitives" that uses map rather than
>>> pardo in a couple of places to do a true wordcount using good ol'
>>> Shakespeare and very very primitive GCS IO.
>>>
>>> Best,
>>> B
>>>
>>> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis 
>>> wrote:
>>>
 Indeed :-) Yeah, I went back and forth on the pardo syntax quite a bit
 before settling on where I ended up. Ultimately I decided to go with
 something that felt more Swift-y than anything else which means that rather
 than dealing with a single element like you do in the other SDKs you're
 dealing with a stream of elements (which of course will often be of size
 1). That's a really natural paradigm in the Swift world especially with the
 async / await structures. So when you see something like:

 pardo(name:"Read Files") { filenames,output,errors in

 for try await (filename,_,_) in filenames {
   ...
   output.emit(data)

 }

 filenames is the input stream and then output and errors are both
 output streams. In theory you can have as many output streams as you like
 though at the moment there's a compiler bug in the new type pack feature
 that limits it to "as many as I felt like supporting". Presumably this will
 get fixed before the official 5.9 release which will probably be in the
 October timeframe if history is any guide)

 If you had parameterization you wanted to send that would look like
 pardo("Parameter") { param,filenames,output,error in ... } where "param"
 would take on the value of "Parameter." All of this is being typechecked at
 compile time BTW.


 the (filename,_,_) is a tuple spreading construct like you have in ES6
 and other things where "_" is Swift for "ignore." In this case
 PCollectionStreams have an element signature of (Of,Date,Window) so you can
 optionally extract the timestamp and the window if you want to manipulate
 it somehow.

 That said it would also be natural to provide elementwise pardos---
 that would probably mean having explicit type signatures in the closure. I
 had that at one point, but it felt less natural the more I used it. I'm
 also slowly working towards adding a more "traditional" DoFn implementation
 approach where you implement 

Re: [Request for Feedback] Swift SDK Prototype

2023-08-24 Thread Robert Bradshaw via user
Ah, I see.

Yeah, I've thought about using an iterable for the whole bundle rather than
start/finish bundle callbacks, but one of the questions is how that would
impact implicit passing of the timestamp (and other) metadata from
input elements to output elements. (You can of course attach the metadata
to any output that happens in the loop body, but it's very easy to
implicitly to break the 1:1 relationship here (e.g. by doing buffering or
otherwise modifying local state) and this would be hard to detect. (I
suppose trying to output after the loop finishes could require
something more explicit).


On Wed, Aug 23, 2023 at 6:56 PM Byron Ellis  wrote:

> Oh, I also forgot to mention that I included element-wise collection
> operations like "map" that eliminate the need for pardo in many cases. the
> groupBy command is actually a map + groupByKey under the hood. That was to
> be more consistent with Swift's collection protocol (and is also why
> PCollection and PCollectionStream are different types... PCollection
> implements map and friends as pipeline construction operations whereas
> PCollectionStream is an actual stream)
>
> I just happened to push some "IO primitives" that uses map rather than
> pardo in a couple of places to do a true wordcount using good ol'
> Shakespeare and very very primitive GCS IO.
>
> Best,
> B
>
> On Wed, Aug 23, 2023 at 6:08 PM Byron Ellis  wrote:
>
>> Indeed :-) Yeah, I went back and forth on the pardo syntax quite a bit
>> before settling on where I ended up. Ultimately I decided to go with
>> something that felt more Swift-y than anything else which means that rather
>> than dealing with a single element like you do in the other SDKs you're
>> dealing with a stream of elements (which of course will often be of size
>> 1). That's a really natural paradigm in the Swift world especially with the
>> async / await structures. So when you see something like:
>>
>> pardo(name:"Read Files") { filenames,output,errors in
>>
>> for try await (filename,_,_) in filenames {
>>   ...
>>   output.emit(data)
>>
>> }
>>
>> filenames is the input stream and then output and errors are both output
>> streams. In theory you can have as many output streams as you like though
>> at the moment there's a compiler bug in the new type pack feature that
>> limits it to "as many as I felt like supporting". Presumably this will get
>> fixed before the official 5.9 release which will probably be in the October
>> timeframe if history is any guide)
>>
>> If you had parameterization you wanted to send that would look like
>> pardo("Parameter") { param,filenames,output,error in ... } where "param"
>> would take on the value of "Parameter." All of this is being typechecked at
>> compile time BTW.
>>
>>
>> the (filename,_,_) is a tuple spreading construct like you have in ES6
>> and other things where "_" is Swift for "ignore." In this case
>> PCollectionStreams have an element signature of (Of,Date,Window) so you can
>> optionally extract the timestamp and the window if you want to manipulate
>> it somehow.
>>
>> That said it would also be natural to provide elementwise pardos--- that
>> would probably mean having explicit type signatures in the closure. I had
>> that at one point, but it felt less natural the more I used it. I'm also
>> slowly working towards adding a more "traditional" DoFn implementation
>> approach where you implement the DoFn as an object type. In that case it
>> would be very very easy to support both by having a default stream
>> implementation call the equivalent of processElement. To make that
>> performant I need to implement an @DoFn macro and I just haven't gotten to
>> it yet.
>>
>> It's a bit more work and I've been prioritizing implementing composite
>> and external transforms for the reasons you suggest. :-) I've got the
>> basics of a composite transform (there's an equivalent wordcount example)
>> and am hooking it into the pipeline generation, which should also give me
>> everything I need to successfully hook in external transforms as well. That
>> will give me the jump on IOs as you say. I can also treat the pipeline
>> itself as a composite transform which lets me get rid of the Pipeline {
>> pipeline in ... } and just instead have things attach themselves to the
>> pipeline implicitly.
>>
>> That said, there are some interesting IO possibilities that would be
>> Swift native. In particularly, I've been looking at the native Swift
>> binding for DuckDB (which is C++ based). DuckDB is SQL based but not
>> distributed in the same was as, say, Beam SQL... but it would allow for SQL
>> statements on individual files with projection pushdown supported for
>> things like Parquet which could have some cool and performant data lake
>> applications. I'll probably do a couple of the simpler IOs as
>> well---there's a Swift AWS SDK binding that's pretty good that would give
>> me S3 and there's a Cloud auth library as well that makes it pretty easy to
>> work with GCS.
>>
>> In any c

Re: [Request for Feedback] Swift SDK Prototype

2023-08-23 Thread Robert Bradshaw via user
Neat.

Nothing like writing and SDK to actually understand how the FnAPI works :).
I like the use of groupBy. I have to admit I'm a bit mystified by the
syntax for parDo (I don't know swift at all which is probably tripping me
up). The addition of external (cross-language) transforms could let you
steal everything (e.g. IOs) pretty quickly from other SDKs.

On Fri, Aug 18, 2023 at 7:55 AM Byron Ellis via user 
wrote:

> For everyone who is interested, here's the draft PR:
>
> https://github.com/apache/beam/pull/28062
>
> I haven't had a chance to test it on my M1 machine yet though (there's a
> good chance there are a few places that need to properly address
> endianness. Specifically timestamps in windowed values and length in
> iterable coders as those both use specifically bigendian representations)
>
>
> On Thu, Aug 17, 2023 at 8:57 PM Byron Ellis  wrote:
>
>> Thanks Cham,
>>
>> Definitely happy to open a draft PR so folks can comment---there's not as
>> much code as it looks like since most of the LOC is just generated
>> protobuf. As for the support, I definitely want to add external transforms
>> and may actually add that support before adding the ability to make
>> composites in the language itself. With the way the SDK is laid out adding
>> composites to the pipeline graph is a separate operation than defining a
>> composite.
>>
>> On Thu, Aug 17, 2023 at 4:28 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks Byron. This sounds great. I wonder if there is interest in Swift
>>> SDK from folks currently subscribed to the +user 
>>>  list.
>>>
>>> On Wed, Aug 16, 2023 at 6:53 PM Byron Ellis via dev 
>>> wrote:
>>>
 Hello everyone,

 A couple of months ago I decided that I wanted to really understand how
 the Beam FnApi works and how it interacts with the Portable Runner. For me
 at least that usually means I need to write some code so I can see things
 happening in a debugger and to really prove to myself I understood what was
 going on I decided I couldn't use an existing SDK language to do it since
 there would be the temptation to read some code and convince myself that I
 actually understood what was going on.

 One thing led to another and it turns out that to get a minimal FnApi
 integration going you end up writing a fair bit of an SDK. So I decided to
 take things to a point where I had an SDK that could execute a word count
 example via a portable runner backend. I've now reached that point and
 would like to submit my prototype SDK to the list for feedback.

 It's currently living in a branch on my fork here:

 https://github.com/byronellis/beam/tree/swift-sdk/sdks/swift

 At the moment it runs via the most recent XCode Beta using Swift 5.9 on
 Intel Macs, but should also work using beta builds of 5.9 for Linux running
 on Intel hardware. I haven't had a chance to try it on ARM hardware and
 make sure all of the endian checks are complete. The
 "IntegrationTests.swift" file contains a word count example that reads some
 local files (as well as a missing file to exercise DLQ functionality) and
 output counts through two separate group by operations to get it past the
 "map reduce" size of pipeline. I've tested it against the Python Portable
 Runner. Since my goal was to learn FnApi there is no Direct Runner at this
 time.

 I've shown it to a couple of folks already and incorporated some of
 that feedback already (for example pardo was originally called dofn when
 defining pipelines). In general I've tried to make the API as "Swift-y" as
 possible, hence the heavy reliance on closures and while there aren't yet
 composite PTransforms there's the beginnings of what would be needed for a
 SwiftUI-like declarative API for creating them.

 There are of course a ton of missing bits still to be implemented, like
 counters, metrics, windowing, state, timers, etc.

>>>
>>> This should be fine and we can get the code documented without these
>>> features. I think support for composites and adding an external transform
>>> (see, Java
>>> ,
>>> Python
>>> ,
>>> Go
>>> ,
>>> TypeScript
>>> )
>>> to add support for multi-lang will bring in a lot of features (for example,
>>> I/O connectors) for free.
>>>
>>>

 Any and all feedback welcome and happy to submit a PR if folks are
 interested, though the "Swift Way" would be to have it in its own repo so
 that i

Re: Getting Started With Implementing a Runner

2023-07-24 Thread Robert Bradshaw via user
I took a first pass at
https://github.com/apache/beam/blob/be19140f3e9194721f36e57f4a946adc6c43971a/website/www/site/content/en/contribute/runner-guide.md

https://github.com/apache/beam/blob/1cfc0fdc6ff27ad70365683fdc8264f42642f6e9/sdks/python/apache_beam/runners/trivial_runner.py
may also be of interest.

On Fri, Jul 21, 2023 at 7:25 AM Joey Tran  wrote:
>
> Could you let me know when you update it? I would be interested in rereading 
> after the rewrite.
>
> Thanks!
> Joey
>
> On Fri, Jul 14, 2023 at 4:38 PM Robert Bradshaw  wrote:
>>
>> I'm taking an action item to update that page, as it is *way* out of date.
>>
>> On Thu, Jul 13, 2023 at 6:54 PM Joey Tran  wrote:
>>>
>>> I see. I guess I got a little confused since these are mentioned in the 
>>> Authoring a Runner docs page which implied to me that they'd be safe to 
>>> use. I'll check out the bundle_processor. Thanks!
>>>
>>> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw  wrote:
>>>>
>>>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran  wrote:
>>>>>
>>>>> Working on this on and off now and getting some pretty good traction.
>>>>>
>>>>> One thing I'm a little worried about is all the classes that are marked 
>>>>> "internal use only". A lot of these seem either very useful or possibly 
>>>>> critical to writing a runner. How strictly should I interpret these 
>>>>> private implementation labels?
>>>>>
>>>>> A few bits that I'm interested in using ordered by how surprised I was to 
>>>>> find that they're internal only.
>>>>>
>>>>>  - apache_bean.pipeline.AppliedPTransform
>>>>>  - apache_beam.pipeline.PipelineVisitor
>>>>>  - apache_beam.runners.common.DoFnRunner
>>>>
>>>>
>>>> The public API is the protos. You should not have to interact with 
>>>> AppliedPTransform and PipelineVisitor directly (and while you can reach in 
>>>> and do so, there are no promises here and these are subject to change). As 
>>>> for DoFnRunner, if you're trying to reach in at this level you're probably 
>>>> going to have to be replicating a bunch of surrounding infrastructure as 
>>>> well. I would recommend using a BundleProcessor [1] to coordinate the work 
>>>> (which will internally wire up the chain of DoFns correctly and take them 
>>>> through their proper lifecycle). As mentioned above, you can directly 
>>>> borrow the translations in fn_api_runner to go from a full Pipeline graph 
>>>> (proto) to a set of fused DoFns to execute in topological order (as 
>>>> ProcessBundleDescriptor protos, which is what BundleProcessor accepts).
>>>>
>>>> [1] 
>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>>>
>>>>>
>>>>> Thanks again for the help,
>>>>> Joey
>>>>>
>>>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath  
>>>>> wrote:
>>>>>>
>>>>>> Another advantage of a portable runner would be that it will be using 
>>>>>> well defined and backwards compatible Beam portable APIs to communicate 
>>>>>> with SDKs. I think this is specially important for runners that do not 
>>>>>> live in the Beam repo since otherwise future SDK releases could break 
>>>>>> your runner in subtle ways. Also portability gives you more flexibility 
>>>>>> when it comes to choosing an SDK to define the pipeline and will allow 
>>>>>> you to execute transforms in any SDK via cross-language.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user 
>>>>>>  wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran  
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Totally doable by one person, especially given the limited feature 
>>>>>>>>> set you mention above. 
>>>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>>>  is a good starting point as to what the relationship be

Re: Growing checkpoint size with Python SDF for reading from Redis streams

2023-07-20 Thread Robert Bradshaw via user
Your SDF looks fine. I wonder if there is an issue with how Flink is
implementing SDFs (e.g. not garbage collecting previous remainders).

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran
 wrote:
>
> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis 
> stream via an SDF, in the following environment:
>
> Python 3.11
> Apache Beam 2.48.0
> Flink 1.16
> Checkpoint interval: 60s
> state.backend (Flink): hashmap
> state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing, even 
> when there are no items to read on the Redis stream. Since there are no items 
> to read on the Redis stream, the Redis stream SDF is simply doing the 
> following steps repeatedly, as part of DoFn.process, i.e. the pattern 
> described in the user-initiated checkpoint pattern in the Apache Beam 
> programming guide to handle polling for new items with some delay, if the 
> last poll returned no items:
>
> Make the call to the Redis client to read items from the Redis stream
> Receive no items from the Redis stream, and hence,
> Call tracker.defer_remainder(Duration.of(5)) and return-ing to defer 
> execution for 5 seconds. That code is located here.
> Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using 
> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows 
> large enough to cause the task manager to crash, due to exhausting Java heap 
> space. The rate of checkpoint size growth is proportional to the number of 
> tracker.defer_remainder() calls I have done, i.e. increasing parallelism 
> and/or decreasing the timeout used in tracker.defer_remainder will increase 
> the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were getting 
> larger with each checkpoint (just using the less command) and noticed that 
> many copies of the residual restriction were present, which seemed like a red 
> flag. The residual restriction here is the one that results from calling 
> tracker.defer_remainder(), which results in a tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint 
> size here: https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. 
> I've included the restriction provider/tracker and other pieces for 
> completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 🙏🏾
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights


Re: Getting Started With Implementing a Runner

2023-07-14 Thread Robert Bradshaw via user
I'm taking an action item to update that page, as it is *way* out of date.

On Thu, Jul 13, 2023 at 6:54 PM Joey Tran  wrote:

> I see. I guess I got a little confused since these are mentioned in the 
> Authoring
> a Runner
> <https://beam.apache.org/contribute/runner-guide/#the-runner-api-protos> docs
> page which implied to me that they'd be safe to use. I'll check out the
> bundle_processor. Thanks!
>
> On Mon, Jul 10, 2023 at 1:07 PM Robert Bradshaw 
> wrote:
>
>> On Sun, Jul 9, 2023 at 9:22 AM Joey Tran 
>> wrote:
>>
>>> Working on this on and off now and getting some pretty good traction.
>>>
>>> One thing I'm a little worried about is all the classes that are marked
>>> "internal use only". A lot of these seem either very useful or possibly
>>> critical to writing a runner. How strictly should I interpret these private
>>> implementation labels?
>>>
>>> A few bits that I'm interested in using ordered by how surprised I was
>>> to find that they're internal only.
>>>
>>>  - apache_bean.pipeline.AppliedPTransform
>>>  - apache_beam.pipeline.PipelineVisitor
>>>  - apache_beam.runners.common.DoFnRunner
>>>
>>
>> The public API is the protos. You should not have to interact
>> with AppliedPTransform and PipelineVisitor directly (and while you can
>> reach in and do so, there are no promises here and these are subject to
>> change). As for DoFnRunner, if you're trying to reach in at this level
>> you're probably going to have to be replicating a bunch of surrounding
>> infrastructure as well. I would recommend using a BundleProcessor [1] to
>> coordinate the work (which will internally wire up the chain of DoFns
>> correctly and take them through their proper lifecycle). As mentioned
>> above, you can directly borrow the translations in fn_api_runner to go from
>> a full Pipeline graph (proto) to a set of fused DoFns to execute in
>> topological order (as ProcessBundleDescriptor protos, which is
>> what BundleProcessor accepts).
>>
>> [1]
>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851
>>
>>
>>> Thanks again for the help,
>>> Joey
>>>
>>> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Another advantage of a portable runner would be that it will be using
>>>> well defined and backwards compatible Beam portable APIs to communicate
>>>> with SDKs. I think this is specially important for runners that do not live
>>>> in the Beam repo since otherwise future SDK releases could break your
>>>> runner in subtle ways. Also portability gives you more flexibility when it
>>>> comes to choosing an SDK to define the pipeline and will allow you to
>>>> execute transforms in any SDK via cross-language.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran 
>>>>> wrote:
>>>>>
>>>>>> Totally doable by one person, especially given the limited feature
>>>>>>> set you mention above.
>>>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>>>  is
>>>>>>> a good starting point as to what the relationship between a Runner and 
>>>>>>> the
>>>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>>>> perspective of an SDK, but the story is largely about the interface 
>>>>>>> which
>>>>>>> is directly applicable).
>>>>>>
>>>>>>
>>>>>> Great slides, I really appreciate the illustrations.
>>>>>>
>>>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>>>> imagined that once the Runner started execution of a workflow, it was
>>>>>> Runner all the way down. Is the Fn API the only way to implement a 
>>>>>> runner?
>>>>>> Our execution environment is a bit constrained in such a way that we 
>>>>>> can't
>>>>>> expose the APIs required to implement the Fn API. 

Re: Pandas 2 Timeline Estimate

2023-07-12 Thread Robert Bradshaw via user
Contributions welcome! I don't think we're at the point we can stop
supporting Pandas 1.x though, so we'd have to do it in such a way as to
support both.

On Wed, Jul 12, 2023 at 4:53 PM XQ Hu via user  wrote:

> https://github.com/apache/beam/issues/27221#issuecomment-1603626880
>
> This tracks the progress.
>
> On Wed, Jul 12, 2023 at 7:37 PM Adlae D'Orazio 
> wrote:
>
>> Hello,
>>
>> I am currently trying to use Interactive Beam to run my pipelines through
>> a Jupyter notebook, but I
>> have internal packages depending on Pandas 2. When I install interactive
>> beam, it overrides these dependencies. I was wondering whether there's a
>> timeframe for moving Beam to Pandas 2.v2? I know it has only been out
>> since late April, so it's not surprising that ya'll haven't supported it
>> yet. Just curious. Thank you!
>>
>> Best,
>> Adlae
>>
>


Re: Getting Started With Implementing a Runner

2023-07-10 Thread Robert Bradshaw via user
On Sun, Jul 9, 2023 at 9:22 AM Joey Tran  wrote:

> Working on this on and off now and getting some pretty good traction.
>
> One thing I'm a little worried about is all the classes that are marked
> "internal use only". A lot of these seem either very useful or possibly
> critical to writing a runner. How strictly should I interpret these private
> implementation labels?
>
> A few bits that I'm interested in using ordered by how surprised I was to
> find that they're internal only.
>
>  - apache_bean.pipeline.AppliedPTransform
>  - apache_beam.pipeline.PipelineVisitor
>  - apache_beam.runners.common.DoFnRunner
>

The public API is the protos. You should not have to interact
with AppliedPTransform and PipelineVisitor directly (and while you can
reach in and do so, there are no promises here and these are subject to
change). As for DoFnRunner, if you're trying to reach in at this level
you're probably going to have to be replicating a bunch of surrounding
infrastructure as well. I would recommend using a BundleProcessor [1] to
coordinate the work (which will internally wire up the chain of DoFns
correctly and take them through their proper lifecycle). As mentioned
above, you can directly borrow the translations in fn_api_runner to go from
a full Pipeline graph (proto) to a set of fused DoFns to execute in
topological order (as ProcessBundleDescriptor protos, which is
what BundleProcessor accepts).

[1]
https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L851


> Thanks again for the help,
> Joey
>
> On Fri, Jun 23, 2023 at 8:34 PM Chamikara Jayalath 
> wrote:
>
>> Another advantage of a portable runner would be that it will be using
>> well defined and backwards compatible Beam portable APIs to communicate
>> with SDKs. I think this is specially important for runners that do not live
>> in the Beam repo since otherwise future SDK releases could break your
>> runner in subtle ways. Also portability gives you more flexibility when it
>> comes to choosing an SDK to define the pipeline and will allow you to
>> execute transforms in any SDK via cross-language.
>>
>> Thanks,
>> Cham
>>
>> On Fri, Jun 23, 2023 at 1:57 PM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>>
>>>
>>> On Fri, Jun 23, 2023 at 1:43 PM Joey Tran 
>>> wrote:
>>>
>>>> Totally doable by one person, especially given the limited feature set
>>>>> you mention above.
>>>>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>>>>>  is
>>>>> a good starting point as to what the relationship between a Runner and the
>>>>> SDK is at a level of detail sufficient for implementation (told from the
>>>>> perspective of an SDK, but the story is largely about the interface which
>>>>> is directly applicable).
>>>>
>>>>
>>>> Great slides, I really appreciate the illustrations.
>>>>
>>>> I hadn't realized there was a concept of an "SDK Worker", I had
>>>> imagined that once the Runner started execution of a workflow, it was
>>>> Runner all the way down. Is the Fn API the only way to implement a runner?
>>>> Our execution environment is a bit constrained in such a way that we can't
>>>> expose the APIs required to implement the Fn API. To be forthright, we
>>>> basically only have the ability to start a worker either with a known
>>>> Pub/Sub topic to expect data from and a Pub/Sub topic to write to; or with
>>>> a bundle of data to process and return the outputs for. We're constrained
>>>> from really any additional communication with a worker beyond that.
>>>>
>>>
>>> The "worker" abstraction gives the ability to wrap any user code in a
>>> way that it can be called from any runner. If you're willing to constrain
>>> the code you're wrapping (e.g. "Python DoFns only") then this "worker" can
>>> be a logical, rather than physical, concept.
>>>
>>> Another way to look at it is that in practice, the "runner" often has
>>> its own notion of "workers" which wrap (often in a 1:1 way) the logical
>>> "SDK Worker" (which in turn invokes the actual DoFns). This latter may be
>>> inlined (e.g. if it's 100% Python on both sides). See, for example,
>>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_ha

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
transforms and
>>> assess from there.
>>>
>>> Not sure if this is a lofty goal or not, so happy to hear your thoughts
>>> as to whether this seems reasonable and achievable without a large
>>> concerted effort or even if the general idea makes any sense. (I recognize
>>> that it might not be *easy*, but I don't have the resources to dedicate
>>> more than myself to work on a PoC)
>>>
>>
>> Totally doable by one person, especially given the limited feature set
>> you mention above.
>> https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
>> is a good starting point as to what the relationship between a Runner and
>> the SDK is at a level of detail sufficient for implementation (told from
>> the perspective of an SDK, but the story is largely about the interface
>> which is directly applicable).
>>
>> Given the limited feature set you proposed, this is similar to the
>> original Python portable runner which took a week or two to put together
>> (granted a lot has been added since then), or the typescript direct runner
>> (
>> https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
>> ) which was done (in its basic form, no support for side inputs and such)
>> in less than a week. Granted, as these are local runners, this illustrates
>> only the Beam-side complexity of things (not the work of actually
>> implementing a distributed shuffle, starting and assigning work to multiple
>> workers, etc. but presumably that's the kind of thing your execution
>> environment already takes care of.
>>
>> As for some more concrete pointers, you could probably leverage a lot of
>> what's there by invoking create_stages
>>
>>
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362
>>
>> which will do optimization, fusion, etc. and then implementing your own
>> version of run_stages
>>
>>
>> https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392
>>
>> to execute these in topological order on your compute infrastructure. (If
>> you're not doing streaming, this is much more straightforward than all the
>> bundler scheduler stuff that currently exists in that code).
>>
>>
>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> If Beam Runner Authoring Guide is rather high-level for you, then, at
>>>>> fist, I’d suggest to answer two questions for yourself:
>>>>> - Am I going to implement a portable runner or native one?
>>>>>
>>>>
>>>> The answer to this should be portable, as non-portable ones will be
>>>> deprecated.
>>>>
>>>>
>>>> Well, actually this is a question that I don’t remember we discussed
>>>> here in details before and had a common agreement.
>>>>
>>>> Actually, I’m not sure that I understand clearly what is meant by
>>>> “deprecation" in this case. For example, Portable Spark Runner is heavily
>>>> actually based on native Spark RDD runner and its translations. So, which
>>>> part should be deprecated and what is a reason for that?
>>>>
>>>> Well, anyway I guess it’s off topic here.
>>>>
>>>> Also, we don’t know if this new runner will be contributed back to
>>>> Beam, what is a runtime and what actually is a final goal of it.
>>>> So I agree that more details on this would be useful.
>>>>
>>>> —
>>>> Alexey
>>>>
>>>>
>>>> - Which SDK I should use for this runner?
>>>>>
>>>>
>>>> The answer to the above question makes this one moot :).
>>>>
>>>> On a more serious note, could you tell us a bit more about the runner
>>>> you're looking at implementing?
>>>>
>>>>
>>>>> Then, depending on answers, I’d suggest to take as an example one of
>>>>>

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
On Fri, Jun 23, 2023 at 11:15 AM Joey Tran 
wrote:

> Thanks all for the responses!
>
> If Beam Runner Authoring Guide is rather high-level for you, then, at
>> fist, I’d suggest to answer two questions for yourself:
>> - Am I going to implement a portable runner or native one?
>>
>
> Portable sounds great, but the answer depends on how much additional cost
> it'd require to implement portable over non-portable, even considering
> future deprecation (unless deprecation is happening tomorrow). I'm not
> familiar enough to know what the additional cost is so I don't have a firm
> answer.
>

I would way it would not be that expensive to write it in a "portable
compatible" way (i.e it uses the publicly-documented protocol as the
interface rather than reaching into internal details) even if it doesn't
use GRCP and fire up separate processes/docker images for the workers
(preferring to do tall of that inline like the Python portable direct
runner does).


> - Which SDK I should use for this runner?
>>
> I'd be developing this runner against the python SDK and if the runner
> only worked with the python SDK that'd be okay in the short term
>

Yes. And if you do it the above way, it should be easy to extend (or not)
if/when the need arises.


> Also, we don’t know if this new runner will be contributed back to Beam,
>> what is a runtime and what actually is a final goal of it.
>
> Likely won't be contributed back to Beam (not sure if it'd actually be
> useful to a wide audience anyways).
>
> The context is we've been developing an in-house large-scale pipeline
> framework that encapsulates both the programming model and the
> runner/execution of data workflows. As it's grown, I keep finding myself
> just reimplementing features and abstractions Beam has already implemented,
> so I wanted to explore adopting Beam. Our execution environment is very
> particular though and our workflows require it (due to the way we license
> our software), so my plan was to try to create a very basic runner that
> uses our execution environment. The runner could have very few features
> e.g. no streaming, no metrics, no side inputs, etc. After that I'd probably
> introduce a shim for some of our internally implemented transforms and
> assess from there.
>
> Not sure if this is a lofty goal or not, so happy to hear your thoughts as
> to whether this seems reasonable and achievable without a large concerted
> effort or even if the general idea makes any sense. (I recognize that it
> might not be *easy*, but I don't have the resources to dedicate more than
> myself to work on a PoC)
>

Totally doable by one person, especially given the limited feature set you
mention above.
https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE
is a good starting point as to what the relationship between a Runner and
the SDK is at a level of detail sufficient for implementation (told from
the perspective of an SDK, but the story is largely about the interface
which is directly applicable).

Given the limited feature set you proposed, this is similar to the original
Python portable runner which took a week or two to put together (granted a
lot has been added since then), or the typescript direct runner (
https://github.com/apache/beam/blob/ea9147ad2946f72f7d52924cba2820e9aae7cd91/sdks/typescript/src/apache_beam/runners/direct_runner.ts
) which was done (in its basic form, no support for side inputs and such)
in less than a week. Granted, as these are local runners, this illustrates
only the Beam-side complexity of things (not the work of actually
implementing a distributed shuffle, starting and assigning work to multiple
workers, etc. but presumably that's the kind of thing your execution
environment already takes care of.

As for some more concrete pointers, you could probably leverage a lot of
what's there by invoking create_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L362

which will do optimization, fusion, etc. and then implementing your own
version of run_stages

https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L392

to execute these in topological order on your compute infrastructure. (If
you're not doing streaming, this is much more straightforward than all the
bundler scheduler stuff that currently exists in that code).



>
>
>
>
>
> On Fri, Jun 23, 2023 at 12:17 PM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>>
>>
>> On 23 Jun 2023, at 17:40, Robert Bradshaw via user 
>> wrote:
>>
>> On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko 
>> wrote:
>>
>>> If B

Re: Getting Started With Implementing a Runner

2023-06-23 Thread Robert Bradshaw via user
On Fri, Jun 23, 2023, 7:37 AM Alexey Romanenko 
wrote:

> If Beam Runner Authoring Guide is rather high-level for you, then, at
> fist, I’d suggest to answer two questions for yourself:
> - Am I going to implement a portable runner or native one?
>

The answer to this should be portable, as non-portable ones will be
deprecated.

- Which SDK I should use for this runner?
>

The answer to the above question makes this one moot :).

On a more serious note, could you tell us a bit more about the runner
you're looking at implementing?


> Then, depending on answers, I’d suggest to take as an example one of the
> most similar Beam runners and use it as a more detailed source of
> information along with Beam runner doc mentioned before.
>
> —
> Alexey
>
> On 22 Jun 2023, at 14:39, Joey Tran  wrote:
>
> Hi Beam community!
>
> I'm interested in trying to implement a runner with my company's execution
> environment but I'm struggling to get started. I've read the docs page
>  on
> implementing a runner but it's quite high level. Anyone have any concrete
> suggestions on getting started?
>
> I've started by cloning and running the hello world example
> . I've then subclassed `
> PipelineRunner
> `
> to create my own custom runner but at this point I'm a bit stuck. My custom
> runner just looks like
>
> class CustomRunner(runner.PipelineRunner):
> def run_pipeline(self, pipeline,
>  options):
> self.visit_transforms(pipeline, options)
>
> And when using it I get an error about not having implemented "Impulse"
>
> NotImplementedError: Execution of []
> not implemented in runner .
>
> Am I going about this the right way? Are there tests I can run my custom
> runner against to validate it beyond just running the hello world example?
> I'm finding myself just digging through the beam source to try to piece
> together how a runner works and I'm struggling to get a foothold. Any
> guidance would be greatly appreciated, especially if anyone has any
> experience implementing their own python runner.
>
> Thanks in advance! Also, could I get a Slack invite?
> Cheers,
> Joey
>
>
>


Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Robert Bradshaw via user
The key in GroupIntoBatches is actually not semantically meaningful, and
for a batch pipeline the use of state/timers is not needed either. If all
you need to do is batch elements into groups of (at most) N, you can write
a DoFn that collects things in its process method and emits them when the
batch is full (and also in the finish bundle method, though some care needs
to be taken to handle windowing correctly). On the other hand, if you're
trying to limit the parallelism across all workers you'd likely need to
limit the number of concurrently-processed keys (which would require a
grouping of some sort onto a finite number of keys, unless you want to cap
your entire pipeline at a certain number of workers).

On Thu, May 25, 2023 at 2:34 PM Evan Galpin  wrote:

> Understood, thanks for the clarification, I'll need to look more in-depth
> at my pipeline code then.  I'm definitely observing that all steps
> downstream from the Stateful step in my pipeline do not start until steps
> upstream of the Stateful step are fully completed.  The Stateful step is a
> RateLimit[1] transfer which borrows heavily from GroupIntoBatches.
>
> [1] https://gist.github.com/egalpin/162a04b896dc7be1d0899acf17e676b3
>
> On Thu, May 25, 2023 at 2:25 PM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> The GbkBeforeStatefulParDo is an implementation detail used to send all
>> elements with the same key to the same worker (so that they can share
>> state, which is itself partitioned by worker). This does cause a global
>> barrier in batch pipelines.
>>
>> On Thu, May 25, 2023 at 2:15 PM Evan Galpin  wrote:
>>
>>> Hi all,
>>>
>>> I'm running into a scenario where I feel that Dataflow Overrides
>>> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
>>> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
>>> needs to have processed all the data in a window before it can output.
>>>
>>> Is it strictly required that GbkBeforeStatefulParDo must run before any
>>> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
>>> to protect against, and how can it be bypassed/disabled while still using
>>> DataflowRunner?
>>>
>>> Thanks,
>>> Evan
>>>
>>


Re: [Dataflow][Stateful] Bypass Dataflow Overrides?

2023-05-25 Thread Robert Bradshaw via user
The GbkBeforeStatefulParDo is an implementation detail used to send all
elements with the same key to the same worker (so that they can share
state, which is itself partitioned by worker). This does cause a global
barrier in batch pipelines.

On Thu, May 25, 2023 at 2:15 PM Evan Galpin  wrote:

> Hi all,
>
> I'm running into a scenario where I feel that Dataflow Overrides
> (specifically BatchStatefulParDoOverrides.GbkBeforeStatefulParDo ) are
> unnecessarily causing a batch pipeline to "pause" throughput since a GBK
> needs to have processed all the data in a window before it can output.
>
> Is it strictly required that GbkBeforeStatefulParDo must run before any
> stateful DoFn? If not, what failure modes is GbkBeforeStatefulParDo trying
> to protect against, and how can it be bypassed/disabled while still using
> DataflowRunner?
>
> Thanks,
> Evan
>


Re: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies

2023-05-02 Thread Robert Bradshaw via user
Generally these types of vulnerabilities are only exploitable when
processing untrusted data and/or exposing a public service to the
internet. This is not the typical use of Beam (especially the latter),
but that's not to say Beam can't be used in this way. That being said,
it's preferable to simply update libraries rather than have to do this
kind of analysis. On that note, +1 to the removal of Avro from core as
a mitigation for this vulnerability, though that's sometimes easier
said than done.

On Tue, May 2, 2023 at 7:35 AM Brule, Joshua L. (Josh), CISSP via user
 wrote:
>
> The SnakeYAML analysis is exactly what I was looking for. The library of 
> concern is org.codehaus.jackson jackson-mapper-asl 1.9.13.

Are you looking at
https://security.snyk.io/package/maven/org.codehaus.jackson:jackson-mapper-asl/1.9.13
?

I see NOTE: "This vulnerability is only exploitable when the
non-default UNWRAP_SINGLE_VALUE_ARRAYS feature is enabled" which a
quick grep through our codebase indicates we do not use.

> Our scanner is reporting ~20 CVEs with a CVSS of >= 7 and ~60 CVEs total.
>
>
>
> Thank you,
>
> Josh
>
>
>
> From: Bruno Volpato 
> Date: Monday, May 1, 2023 at 9:04 PM
> To: user@beam.apache.org , Brule, Joshua (Josh) L., 
> CISSP 
> Subject: [EXTERNAL] Re: Vulnerabilities in Transitive dependencies
>
> Hi Joshua,
>
>
>
> It may take a lot of effort and knowledge to review whether CVEs are 
> exploitable or not.
>
> I have seen this kind of analysis done in a few cases, such as SnakeYAML 
> recently: https://s.apache.org/beam-and-cve-2022-1471 
> (https://github.com/apache/beam/issues/25449)
>
>
>
> If there is a patch available, I believe we should err on the side of caution 
> and update them (if possible).
>
>
>
> For the example that you mentioned, there is some work started by Alexey 
> Romanenko to remove/decouple Avro from Beam core, so we can upgrade to newer 
> versions: https://github.com/apache/beam/issues/25252.
>
> Another recent progress is Beam releasing a new version of its vendored gRPC 
> to move past some CVEs originated from protobuf-java: 
> https://github.com/apache/beam/issues/25746
>
>
>
>
>
> Is there any other particular dependency that you are concerned about?
>
> Please consider filing an issue at https://github.com/apache/beam/issues so 
> we can start tracking it.
>
>
>
>
>
> Best,
>
> Bruno
>
>
>
>
>
>
>
> On Mon, May 1, 2023 at 5:28 PM Brule, Joshua L. (Josh), CISSP via user 
>  wrote:
>
> Hello,
>
>
>
> I am hoping you could help me with our vulnerability remediation process. We 
> have several development teams using Apache Beam in their projects. When 
> performing our Software Composition Analysis (Third-Party Software) scan, 
> projects utilizing Apache Beam have an incredible number of CVEs, Jackson 
> Data Mapper being an extreme outlier.
>
>
>
> I Jackson Data Mapper is a transitive dependency via Avro but I am wondering. 
> Has the Apache Beam team reviewed these CVEs and found them NOT EXPLOITABLE 
> as implemented. Or if exploitable implemented mitigations pre/post usage of 
> the library?
>
>
>
> Thank you for your time,
>
> Josh
>
>
>
> Joshua Brule | Sr Information Security Engineer


Re: How Beam Pipeline Handle late events

2023-04-24 Thread Robert Bradshaw via user
On Fri, Apr 21, 2023 at 3:37 AM Pavel Solomin  wrote:
>
> Thank you for the information.
>
> I'm assuming you had a unique ID in records, and you observed some IDs 
> missing in Beam output comparing with Spark, and not just some duplicates 
> produced by Spark.
>
> If so, I would suggest to create a P1 issue at 
> https://github.com/apache/beam/issues

+1, ideally with enough information to reproduce. As far as I
understand, what you have should just work (but I'm not a flink
expert).

> Also, did you try setting --checkpointingMode=AT_LEAST_ONCE ?
>
> Unfortunately, I can't be more helpful here, but let me share some of the 
> gotchas I had from my previous experience of running Beam on top of Flink for 
> similar use-case (landing of data from messaging system into files):
>
> (1) https://github.com/apache/beam/issues/26041 - I've solved that by adding 
> a runId into file names which is re-generated between app (re) starts
>
> (2) I used processing time watermarks and simple window without lateness set 
> up - combining it with (1) achieved no data loss
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>
>
>
>
>
>
> On Thu, 20 Apr 2023 at 02:18, Lydian  wrote:
>>
>> Yes, we did enabled this in our pipeline.
>>
>> On Wed, Apr 19, 2023 at 5:00 PM Pavel Solomin  wrote:
>>>
>>> Thank you
>>>
>>> Just to confirm: how did you configure Kafka offset commits? Did you have 
>>> this flag enabled?
>>>
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#isCommitOffsetsInFinalizeEnabled--
>>>
>>>
>>> On Thursday, 20 April 2023, Trevor Burke  wrote:
>>> > Hi Pavel,
>>> > Thanks for the reply.
>>> > No, the event losses are not consistent. While we've been running our 
>>> > pipelines in parallel (Beam vs Spark) we are seeing some days with no 
>>> > event loss and some days with some, but it's always less than 0.05%
>>> >
>>> >
>>> > On Wed, Apr 19, 2023 at 8:07 AM Pavel Solomin  
>>> > wrote:
>>> >>
>>> >> Hello Lydian,
>>> >> Do you always observe data loss? Or - maybe, it happens only when you 
>>> >> restart your pipeline from a Flink savepoint? If you lose data only 
>>> >> between restarts - is you issue similar to 
>>> >> https://github.com/apache/beam/issues/26041 ?
>>> >>
>>> >> Best Regards,
>>> >> Pavel Solomin
>>> >>
>>> >> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Tue, 18 Apr 2023 at 18:58, Lydian  wrote:
>>> >>>
>>> >>> Hi,
>>> >>>
>>> >>> We are using Beam (Python SDK + Flink Runner) to backup our streaming 
>>> >>> data from Kafka to S3. To avoid hitting the s3 threshold, we use 1 
>>> >>> minute fixed window to group messages.  We've had similar pipeline in 
>>> >>> spark that we want to replace it with this new pipeline.  However, the 
>>> >>> Beam pipeline seems always having events missing, which we are thinking 
>>> >>> could be due to late events (because the number of missing events get 
>>> >>> lower when having higher allow_lateness)
>>> >>>
>>> >>> We've tried the following approach to avoid late events, but none of 
>>> >>> them are working:
>>> >>> 1.  Use Processing timestamp instead of event time. Ideally if 
>>> >>> windowing is using the processing timestamp, It shouldn't consider any 
>>> >>> event as late. But this doesn't seem to work at all.
>>> >>> 2.  Configure allow_lateness to 12 hour.  Given that approach 1 seems 
>>> >>> not working as expected, we've also configured the allow_lateness. But 
>>> >>> it still have missing events compared to our old spark pipelines.
>>> >>>
>>> >>> Here's the simplified code we have
>>> >>> ```
>>> >>>
>>> >>> def add_timestamp(event: Any) -> Any:
>>> >>>
>>> >>> import time
>>> >>>
>>> >>> from apache_beam import window
>>> >>>
>>> >>> return window.TimestampedValue(event, time.time())
>>> >>>
>>> >>> (pipeline
>>> >>>
>>> >>> | "Kafka Read" >> ReadFromKafka(topic="test-topic", 
>>> >>> consumer_config=consumer_config)
>>> >>>
>>> >>> | "Adding 'trigger_processing_time' timestamp" >> 
>>> >>> beam.Map(add_timestamp)
>>> >>>
>>> >>> | "Window into Fixed Intervals"
>>> >>>
>>> >>> >> beam.WindowInto(
>>> >>>
>>> >>> beam.window.FixedWindows(fixed_window_size),
>>> >>>
>>> >>> allowed_lateness=beam.utils.timestamp.Duration(allowed_lateness)
>>> >>>
>>> >>> )
>>> >>>
>>> >>> |  "Write to s3" >> beam.ParDo(WriteBatchesToS3(s3_path))
>>> >>>
>>> >>> ```
>>> >>>
>>> >>> I am wondering:
>>> >>> 1. Is the add_timestamp approach correctly marked it to use processing 
>>> >>> time for windowing?  If so, why there still late event consider we are 
>>> >>> using processing time and not event time?
>>> >>> 2.  Are there are any other approaches to avoid dropping any late event 
>>> >>> besides ` allowed_lateness`?  In flink you can output those late events 
>>> >>> as side output, wondering if we can do similar thing in Beam as well? 
>>> >>> Would someone pr

Re: [Question] - Time series - cumulative sum in right order with python api in a batch process

2023-04-24 Thread Robert Bradshaw via user
You are correct in that the data may arrive in an unordered way.
However, once a window finishes, you are guaranteed to have seen all
the data up to that point (modulo late data) and can then confidently
compute your ordered cumulative sum.

You could do something like this:

def cumulative_sums(key, timestamped_values):
  running = 0
  for _, x in sorted(timestamped_values):
yield x

sums = (timestamped_data
  | beam.Map(lambda x, t=DoFn.TimestampParam: (t, x)
  | beam.WindowInto(...)
  | beam.GroupByKey()
  | beam.FlatMapTuple(cumulative_sums))



On Mon, Apr 24, 2023 at 8:23 AM Guagliardo, Patrizio via user
 wrote:
>
> Hi together,
>
>
>
> I want to create a cumulative sum over a time series in a bounded batch 
> processing in Apache beam with the Python API. What you can do is to write a 
> cummulative sum with a stateful DoFn, but the problem you would still face is 
> that you cannot handle it this way when the data in unordered, which is the 
> case in a PCollection. Is there a way to make the cumulative sum over time in 
> a batch process? This is what i did (whithout order):
>
> import apache_beam as beam
>
> from apache_beam import TimeDomain
>
> from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, 
> TimerSpec, CombiningValueStateSpec
>
> from apache_beam.transforms.window import FixedWindows, GlobalWindows
>
>
>
>
>
> class TimestampedSumAccumulator(beam.DoFn):
>
> SUM_STATE = 'sum'
>
>
>
> def process(
>
> self, element,
>
> timestamp=beam.DoFn.TimestampParam,
>
> sum_state=beam.DoFn.StateParam(ReadModifyWriteStateSpec(SUM_STATE, 
> beam.coders.FloatCoder()))
>
> ):
>
> sum_value = sum_state.read() or 0.0
>
> # print(element)
>
> sum_value += element[1]
>
> sum_state.write(sum_value)
>
> yield beam.transforms.window.TimestampedValue(sum_value, timestamp)
>
>
>
>
>
> with beam.Pipeline() as p:
>
> sums = (p
>
> | 'Create' >> beam.Create([
>
> (3.1, 3),
>
> (1.5, 1),
>
> (4.2, 4),
>
> (5.4, 5),
>
> (2.3, 2)
>
> ])
>
> | 'AddTimestamps' >> beam.Map(lambda x: 
> beam.transforms.window.TimestampedValue(x[0], x[1]))
>
> | 'AddKeys' >> beam.Map(lambda x: ('sum_key', x))
>
> | 'Window' >> beam.WindowInto(FixedWindows(10))
>
> | 'Accumulate' >> beam.ParDo(TimestampedSumAccumulator())
>
> | 'Print' >> beam.Map(print))
>
>
>
> How could that be done to make the cumulative sum in the “right” order?
>
>
>
> Thank you very much in advance.
>
>
>
>
> 
> This e-mail and any attachments may be confidential or legally privileged. If 
> you received this message in error or are not the intended recipient, you 
> should destroy the e-mail message and any attachments or copies, and you are 
> prohibited from retaining, distributing, disclosing or using any information 
> contained herein. Please inform us of the erroneous delivery by return 
> e-mail. Thank you for your cooperation. For more information on how we use 
> your personal data please see our Privacy Notice.


Re: Avoid using docker when I use a external transformation

2023-04-18 Thread Robert Bradshaw via user
Docker is not necessary to expand the transform (indeed, by default it
should just pull the Jar and invokes that directly to start the expansion
service), but it is used as the environment in which to execute the
expanded transform.

It would be in theory possible to run the worker without docker as well.
This would involve manually starting up a worker in Java, manually starting
up an expansion service that points to this worker as its environment, and
then using that expansion service from Python. I've never done that myself,
so I don't know how easy it would be, but the "LOOPBACK" runner in Java
could give some insight into how this could be done.



On Tue, Apr 18, 2023 at 5:22 PM Juan Romero  wrote:

> Hi.
>
> I have an issue when I try to run a kafka io pipeline in python on my
> local machine, because in my local machine it is not possible to install
> docker. Seems that beam try to use docker to pull and start the beam java
> sdk i order to start the expansion service. I tried to start manually the
> expansion service and define the expansion service url in the connector
> properties but in anyway it keeps asking by docker process. My question is
> if we can run a pipeline with external transformations without install
> docker.
>
> Looking forward to it. Thanks!!
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-18 Thread Robert Bradshaw via user
Yeah, I don't think we have a good per-operator API for this. If we were to
add it, it probably belongs in ResourceHints.

On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax  wrote:

> Looking at FlinkPipelineOptions, there is a parallelism option you can
> set. I believe this sets the default parallelism for all Flink operators.
>
> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang  wrote:
>
>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>> kind of mechanism, so I am looking for a general solution on the beam side.
>>
>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau 
>> wrote:
>>
>>> To a (small) degree Sparks “new” AQE might be able to help depending on
>>> what kind of operations Beam is compiling it down to.
>>>
>>> Have you tried setting spark.sql.adaptive.enabled &
>>> spark.sql.adaptive.coalescePartitions.enabled
>>>
>>>
>>>
>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> I see. Robert - what is the story for parallelism controls on GBK with
>>>> the Spark or Flink runners?
>>>>
>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang  wrote:
>>>>
>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>
>>>>>
>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax  wrote:
>>>>>
>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>> runs, so there is no need to have such controls. In fact these specific
>>>>>> controls wouldn't make much sense for the way Dataflow implements these
>>>>>> operators.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang  wrote:
>>>>>>
>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>
>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator
>>>>>>>>> level. And the input size of the operator is unknown at compiling 
>>>>>>>>> stage if
>>>>>>>>> it is not a source
>>>>>>>>>  operator,
>>>>>>>>>
>>>>>>>>> Here's an example of flink
>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey
>>>>>>>>> and reduceByKey):
>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> The maximum parallelism is always determined by the parallelism
>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of keys 
>>>>>>>>>> in
>>>>>>>>>> your data determines the maximum parallelism.
>>>>>>>>>>
>>>>>>>>>> Beyond the limitations in your data, it depends on your execution
>>>>>>>>>> engine. If you're using Dataflow, Dataflow is designed to 
>>>>>>>>>> automatically
>>>>>>>>>> determine the parallelism (e.g. work will be dynamically split and 
>>>>>>>>>> moved
>>>>>>>>>> around between workers, the number of workers will autoscale, etc.), 
>>>>>>>>>> so
>>>>>>>>>> there's no need to explicitly set the parallelism of the execution.
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Besides the global parallelism of beam job, is there any way to
>>>>>>>>>>> set parallelism for individual operators like group by and join? I
>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>> execution
>>>>>>>>>>> engine, but it is very common to set parallelism like group by and 
>>>>>>>>>>> join in
>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards
>>>>>>>
>>>>>>> Jeff Zhang
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>> Books (Learning Spark, High Performance Spark, etc.):
>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Is there any way to set the parallelism of operators like group by, join?

2023-04-15 Thread Robert Bradshaw via user
What are you trying to achieve by setting the parallelism?

On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang  wrote:

> Thanks Reuven, what I mean is to set the parallelism in operator level.
> And the input size of the operator is unknown at compiling stage if it is
> not a source
>  operator,
>
> Here's an example of flink
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
> Spark also support to set operator level parallelism (see groupByKey and
> reduceByKey):
> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>
>
> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user 
> wrote:
>
>> The maximum parallelism is always determined by the parallelism of your
>> data. If you do a GroupByKey for example, the number of keys in your data
>> determines the maximum parallelism.
>>
>> Beyond the limitations in your data, it depends on your execution engine.
>> If you're using Dataflow, Dataflow is designed to automatically determine
>> the parallelism (e.g. work will be dynamically split and moved around
>> between workers, the number of workers will autoscale, etc.), so there's no
>> need to explicitly set the parallelism of the execution.
>>
>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang  wrote:
>>
>>> Besides the global parallelism of beam job, is there any way to set
>>> parallelism for individual operators like group by and join? I
>>> understand the parallelism setting depends on the underlying execution
>>> engine, but it is very common to set parallelism like group by and join in
>>> both spark & flink.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Message guarantees

2023-04-14 Thread Robert Bradshaw via user
That is correct.

On Tue, Apr 11, 2023 at 5:44 AM Hans Hartmann  wrote:

>
> Hello,
>
> i'm wondering if Apache Beam is using the message guarantees of the
> execution engines, that the pipeline is running on.
>
> So if i use the SparkRunner the consistency guarantees are exactly-once?
>
> Have a good day and thanks,
>
> Hans Hartmann
>
>


Re: Why is FlatMap different from composing Flatten and Map?

2023-03-15 Thread Robert Bradshaw via user
On Mon, Mar 13, 2023 at 11:33 AM Godefroy Clair 
wrote:

> Hi,
> I am wondering about the way `Flatten()` and `FlatMap()` are implemented
> in Apache Beam Python.
> In most functional languages, FlatMap() is the same as composing
> `Flatten()` and `Map()` as indicated by the name, so Flatten() and
> Flatmap() have the same input.
> But in Apache Beam, Flatten() is using _iterable of PCollections_ while
> FlatMap() is working with _PCollection of Iterables_.
>
> If I am not wrong, the signature of Flatten, Map and FlatMap are :
> ```
> Flatten:: Iterable[PCollections[A]] -> PCollection[A]
> Map:: (PCollection[A], (A-> B)) -> PCollection[B]
> FlatMap:: (PCollection[Iterable[A]], (A->B)) -> [A]
>

FlatMap is actually (PCollection[A], (A->Iterable[B])) -> PCollection[B].


> ```
>
> So my question is is there another "Flatten-like" function  with this
> signature :
> ```
> anotherFlatten:: PCollection[Iterable[A]] -> PCollection[A]
> ```
>
> One of the reason this would be useful, is that when you just want to
> "flatten" a `PCollection` of `iterable` you have to use `FlatMap()`with an
> identity function.
>
> So instead of writing:
> `FlatMap(lambda e: e)`
> I would like to use a function
> `anotherFlatten()`
>

As Reuven mentions, Beam's Flatten could have been called Union, in which
case we'd free up the name Flatten for the PCollection[Iterable[A]] ->
PCollection[A] operation. It's Flatten for historical reasons, and would be
difficult to change now.

FlumeJava uses static constructors to provide Flatten.Iterables:
PCollection[Iterable[A]] -> PCollection[A] vs.  Flatten.PCollections:
Iterable[PCollection[A]] -> PCollection[A].

If you want a FlattenIterables in Python, you could easily implement it as
a composite transform [2] whose implementation is passing the identity
function to FlatMap.

[1]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Flatten.html
[2]
https://beam.apache.org/documentation/programming-guide/#composite-transforms


Re: Deduplicate usage

2023-03-02 Thread Robert Bradshaw via user
Whenever state is used, the runner will arrange such that the same
keys will all go to the same worker, which often involves injecting a
shuffle-like operation if the keys are spread out among many workers
in the input. (An alternative implementation could involve storing the
state in a distributed transactional store with the appropriate
locks.) There is no need for you to do anything before calling the
Deduplicate transform.

On Thu, Mar 2, 2023 at 4:34 PM Binh Nguyen Van  wrote:
>
> Thanks Reuven,
>
> I got the idea of the state is per key and keys are distributed across 
> workers but I am trying to understand where/how the distribution part is 
> implemented so that elements with the same keys will go to the same worker. 
> Do I need to do this before calling `Deduplicate` transform? If not then 
> where is it being implemented?
>
> Thanks
> -Binh
>
>
> On Thu, Mar 2, 2023 at 12:57 PM Reuven Lax via user  
> wrote:
>>
>> State is per-key, and keys are distributed across workers. Two workers 
>> should not be working on the same state.
>>
>> On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van  wrote:
>>>
>>> Thank you Ankur,
>>>
>>> This is the current source code of Deduplicate transform.
>>>
>>>   Boolean seen = seenState.read();
>>>   // Seen state is either set or not set so if it has been set then it 
>>> must be true.
>>>   if (seen == null) {
>>> // We don't want the expiry timer to hold up watermarks.
>>> expiryTimer.offset(duration).withNoOutputTimestamp().setRelative();
>>> seenState.write(true);
>>> receiver.output(element);
>>>   }
>>>
>>> Could you please explain the synchronization for the following scenario?
>>>
>>> There are two workers.
>>> Both workers read the same state at the same time and the state was not set 
>>> yet. In this case, both will get null in the response (I believe)
>>> Both of them will try to set the state and send the output out.
>>>
>>> What will happen in this scenario?
>>>
>>> Thank you
>>> -Binh
>>>
>>>
>>> On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka  wrote:

 Hi Binh, The Deduplicate transform uses state api to do the de-duplication 
 which should do the needful operations to work across multiple concurrent 
 workers.

 Thanks,
 Ankur

 On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van  wrote:
>
> Hi,
>
> I am writing a pipeline and want to apply deduplication. I look at 
> Deduplicate transform that Beam provides and wonder about its usage. Do I 
> need to shuffle input collection by key before calling this 
> transformation? I look at its source code and it doesn’t do any shuffle 
> so wonder how it works when let’s say there are duplicates and the 
> duplicated elements are processed concurrently on multiple workers.
>
> Thank you
> -Binh


Re: OpenJDK8 / OpenJDK11 container deprecation

2023-02-07 Thread Robert Bradshaw via user
Seams reasonable to me.

On Tue, Feb 7, 2023 at 4:19 PM Luke Cwik via user  wrote:
>
> As per [1], the JDK8 and JDK11 containers that Apache Beam uses have stopped 
> being built and supported since July 2022. I have filed [2] to track the 
> resolution of this issue.
>
> Based upon [1], almost everyone is swapping to the eclipse-temurin 
> container[3] as their base based upon the linked issues from the deprecation 
> notice[1]. The eclipse-temurin container is released under these licenses:
> Apache License, Version 2.0
> Eclipse Distribution License 1.0 (BSD)
> Eclipse Public License 2.0
> 一 (Secondary) GNU General Public License, version 2 with OpenJDK Assembly 
> Exception
> 一 (Secondary) GNU General Public License, version 2 with the GNU Classpath 
> Exception
>
> I propose that we swap all our containers to the eclipse-temurin 
> containers[3].
>
> Open to other ideas and also would be great to hear about your experience in 
> any other projects that you have had to make a similar decision.
>
> 1: https://github.com/docker-library/openjdk/issues/505
> 2: https://github.com/apache/beam/issues/25371
> 3: https://hub.docker.com/_/eclipse-temurin


Re: How to submit beam python pipeline to GKE flink cluster

2023-02-03 Thread Robert Bradshaw via user
You should be able to omit the environment_type and environment_config
variables and they will be populated automatically. For running
locally, the flink_master parameter is not needed either (one will be
started up automatically).

On Fri, Feb 3, 2023 at 12:51 PM Talat Uyarer via user
 wrote:
>
> Hi,
>
> Do you use Flink operator or manually deployed session cluster ?
>
> Thanks
>
> On Fri, Feb 3, 2023, 4:32 AM P Singh  wrote:
>>
>> Hi Team,
>>
>> I have set up a flink cluster on GKE and am trying to submit a beam pipeline 
>> with below options. I was able to run this on a local machine but I don't 
>> understand what would be the environment_config? What should I do? what to 
>> put here instead of localhost:5
>>
>> Please help.
>> options = PipelineOptions([
>> "--runner=FlinkRunner",
>> "--flink_version=1.14",
>> "--flink_master=localhost:8081",
>> "--environment_type=EXTERNAL", #EXTERNAL
>> "--environment_config=localhost:5",
>> ])


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
I'm also not sure it's part of the contract that the containerization
technology we use will always have these capabilities.

On Mon, Jan 30, 2023 at 10:53 AM Chad Dombrova  wrote:
>
> Hi Valentyn,
>
>>
>> Beam SDK docker containers on Dataflow VMs are currently launched in 
>> privileged mode.
>
>
> Does this only apply to stock sdk containers?  I'm asking because we use a 
> custom sdk container that we build.  We've tried various ways of running 
> mount from within our custom beam container in Dataflow and we could not get 
> it to work, while the same thing succeeds in local tests and in our CI 
> (gitlab).  The assessment at the time (this was maybe a year ago) was that 
> the container was not running in privileged mode, but if you think that's 
> incorrect we can revisit this and report back with some error logs.
>
> -chad
>


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
Different idea: is it possible to serve this data via another protocol
(e.g. sftp) rather than requiring a mount?

On Mon, Jan 30, 2023 at 9:26 AM Chad Dombrova  wrote:
>
> Hi Robert,
> I know very little about the FileSystem classes, but I don’t think it’s 
> possible for a process running in docker to create an NFS mount without 
> running in privileged [1] mode, which cannot be done with Dataflow. The other 
> ways of gaining access to a mount are:
>
> A. the node running docker has the NFS mount itself and passes it along using 
> docker run --volume.
> B. the mount is created within the container by using docker run --mount.
>
> Neither of these are possible with Dataflow.
>
> Here’s a full example of how an NFS mount can be created when running docker:
>
> docker run -it --network=host \
>--mount 
> 'type=volume,src=pipe-nfs-test,dst=/Volumes/pipe-nfs-test,volume-driver=local,volume-opt=type=nfs,volume-opt=device=:/pipe,"volume-opt=o=addr=turbohal.luma.mel,vers=3"'
>   \
>luma/pipe-shell -- bash
>
> In my ideal world, I would make a PR to add support for the docker --mount 
> flag to Beam for the runners that I can control, and the Dataflow team would 
> add support on their end.
>
> Let me know if I'm missing anything.
>
> https://docs.docker.com/engine/reference/run/#runtime-privilege-and-linux-capabilities
>
> thanks,
> -chad


Re: Dataflow and mounting large data sets

2023-01-30 Thread Robert Bradshaw via user
If it's your input/output data, presumably you could implement a
https://beam.apache.org/releases/javadoc/2.3.0/org/apache/beam/sdk/io/FileSystem.html
for nfs. (I don't know what all that would entail...)

On Mon, Jan 30, 2023 at 9:04 AM Chad Dombrova  wrote:
>
> Hi Israel,
> Thanks for responding.
>
>> And could not the dataset be accessed from Cloud Storage? Does it need to be 
>> specifically NFS?
>
>
> No unfortunately it can't be accessed from Cloud Storage.   Our data resides 
> on high performance Isilon [1] servers using a posix filesystem, and NFS is 
> the tried and true protocol for this.  This configuration cannot be changed 
> for a multitude of reasons, not least of which is that fact that these 
> servers outperform cloud storage at a fraction of the cost of cloud offerings 
> (which is a very big difference for multiple petabytes of storage.  If you'd 
> like more details on why this is not possible I'm happy to explain, but for 
> now let's just say that it's been investigated and it's not practical).  The 
> use of fast posix filers over NFS is fairly ubiquitous in the media and 
> entertainment industry (if you want to know more about how we use Beam, I 
> gave a talk at the Beam Summit a few years ago[2]).
>
> thanks!
> -chad
>
> [1] https://www.dell.com/en-hk/dt/solutions/media-entertainment.htm
> [2] https://www.youtube.com/watch?v=gvbQI3I03a8&t=644s&ab_channel=ApacheBeam
>