Re: Apache Beam Python SDK - Inaccurate session window interval
The session gap duration of 60mins tells the runner to close the window when there is a gap between records of at least 60mins. So for a given key X, if we had data at timestamps: 1:01 1:30 2:45 2:50 then 1:01 and 1:30 would be in one session while 2:45 and 2:50 would be in a second session. If I'm misunderstanding, maybe you could share some data with timestamps and what you expect the output to be. On Fri, Apr 24, 2020 at 2:17 AM Yohei Onishi wrote: > Hi, > > I also deployed the application to Dataflow then got the same result. > The actual session interval was not the same as the given session interval > (60 minutes). > > --- > new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end > 2018-10-19 13:00:00) > new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end > 2018-10-19 11:03:00) > new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end > 2018-10-19 13:00:00) > --- > > > https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval > > On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi > wrote: > >> Hi, >> >> I am trying to process data with 60 minutes session interval using Apache >> Beam Python SDK. But the actual session interval was inaccurate such as >> 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using >> DirectRunner. >> >> Would you help me find a solution to fix this issue and process data with >> 60 minutes session? >> >> I built my pipeline as bellow. >> >> - >> with Pipeline(options=pipeline_options) as pipeline: >> ( >> pipeline >> | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) >> | "Convert" >> ParDo(Convert()) >> | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, >> get_timestamp_from_element(x).timestamp())) >> | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) >> | "Apply Session Window" >> >> WindowInto(window.Sessions(known_args.session_interval)) >> | "Group" >> GroupByKey() >> | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) >> ) >> result = pipeline.run() >> result.wait_until_finish() >> - >> >> session_interval (60 minutes) is provided as bellow. >> >> - >> parser.add_argument( >> "--session_interval", >> help="Interval of each session", >> default=60*60) # 60 mins >> - >> >> WriteToCSV function process data per session. I logged the session >> duration but it was not accurate. >> >> - >> class WriteToCSV(DoFn): >> def __init__(self, output_path): >> self.output_path = output_path >> >> def process(self, element, window=DoFn.WindowParam): >> window_start = window.start.to_utc_datetime() >> window_end = window.end.to_utc_datetime() >> duration = window_end - window_start >> logging.info(">>> new %s record(s) in %s session (start %s end >> %s)", len(click_records), duration, window_start, window_end) >> >> - >> >> Then I got this log messages when I run this application locally with >> DirectRunner. >> >> - >> new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end >> 2018-10-19 05:00:00) >> new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end >> 2018-10-19 03:03:00) >> new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end >> 2018-10-19 05:00:00 >> - >> >> Thanks. >> >> >>
Re: SparkRunner on k8s
> In other words, are there options to the job runner that would eventually translate to ' --volume /storage1:/storage1 ' while the docker container is being run by Flink? Even if it means code changes and building from source, its fine. Please point me in the right direction. I found an open feature request for this, but unfortunately it looks like neither of two attempted implementations ended up being merged: https://issues.apache.org/jira/browse/BEAM-5440 Sorry I haven't had much time to look into your issue with the Spark runner. If you are still interested in trying it, you might try using a different Beam version and see if the problem persists. On Wed, Apr 22, 2020 at 7:56 PM Ramanan, Buvana (Nokia - US/Murray Hill) < buvana.rama...@nokia-bell-labs.com> wrote: > Hi Kyle, > > About FlinkRunner: > > "One is you can mount a directory from the Docker host inside the > container(s). But the more scalable solution is to use a distributed file > system, such as HDFS, Google Cloud Storage, or Amazon S3" > > I am running some benchmarking tests and so I prefer not to use GCS or S3 > (as the network delay can kill the performance). > > I would like to focus on the option of the host mounting the volume into > the containers, but I have not come across a docker command where a host > can mount volumes into running containers. I do not think 'docker create' > volume will help here, please correct if I am wrong. > > Is there a way the job runner can tell the Flink cluster to mount certain > volumes before running the sdk container? And if so, is there a way I can > tell the job runner to tell Flink to mount these volumes? > > In other words, are there options to the job runner that would eventually > translate to ' --volume /storage1:/storage1 ' while the docker container is > being run by Flink? Even if it means code changes and building from source, > its fine. Please point me in the right direction. > > Thanks, > Buvana > -- > *From:* Kyle Weaver > *Sent:* Monday, April 13, 2020 7:34 PM > *To:* user@beam.apache.org > *Subject:* Re: SparkRunner on k8s > > > It appears that the filesystem in the client side is not the same as the > environment that Flink creates to run the Beam pipeline (I think Flink does > a docker run of the python sdk to run the Beam pipeline? In that case, how > would the container know where to write the file?) > > > You are correct. Beam Python execution takes place within a Docker > container, or often multiple containers, depending on your pipeline and > configuration. Multiple containers is probably the cause of the error here. > The Python SDK doesn't do anything special with local file paths; it just > writes them to the local file system of the container. So in order to get a > persistent, shared file system, you have a couple options. One is you can > mount a directory from the Docker host inside the container(s). But the > more scalable solution is to use a distributed file system, such as HDFS, > Google Cloud Storage, or Amazon S3. Check out the Beam programming guide > for more info: > https://beam.apache.org/documentation/programming-guide/#pipeline-io > > On Mon, Apr 13, 2020 at 6:55 PM Ramanan, Buvana (Nokia - US/Murray Hill) < > buvana.rama...@nokia-bell-labs.com> wrote: > > Kyle, > > > > Thanks a lot for the pointers. I got interested to run my beam pipeline on > FlinkRunner and got a local Flink cluster setup, tested a sample code to > work fine. > > > > I started the Beam job runner going: > > docker run --net=host apachebeam/flink1.8_job_server:latest --flink-master > $IP:8081 --job-host $IP --job-port 8099 > > > > Submitted a beam pipeline, which when run with LocalRunner works totally > fine. The last stage of the pipeline code looks as follows: > > . . . > > . . . > > . . . > > output= ( > > { > > 'Mean Open': mean_open, > > 'Mean Close': mean_close > > } | > > beam.CoGroupByKey() | > > beam.io.WriteToText(args.output) > > ) > > > > So, we are ending the pipeline with a io.WriteToText() > > > > Now, when I supply a filename, whether residing in local disk (/tmp) or > network mounted disk(e.g /nas2), I get the following error: > > python test-beam.py –input data/sp500.csv –output /tmp/result.txt > > > > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.6 interpreter. > > ERROR:root:java.lang.RuntimeException: Error received from SDK harness for > instruction 2: Traceback (most recent call last): > > File "apache_beam/runners/common.py", line 883, in > apache_beam.runners.common.DoFnRunner.process > > File "apache_beam/runners/common.py", line 667, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > > File "apache_beam/runners/common.py", line 748, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > > File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", > line 1095, in _fina
Re: Stateful & Timely Call
Sounds like a good addition to the Beam patterns page Reza :) On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma wrote: > > Thanks Robert, > > This is a life saver and its a great help :). It works like a charm. > > Thanks > Aniruddh > > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw wrote: >> >> I may have misinterpreted your email, I thought you didn't have a need for >> keys at all. If this is actually the case, you don't need a GroupByKey, just >> have your DoFn take Rows as input, and emit List as output. That is, >> it's a DoFn>. >> >> You can buffer multiple Rows in an instance variable between process element >> calls. For example, >> >> class MyBufferingDoFn> { >> List buffer = new ArrayList<>(); >> @ProcessElement public void processElement(T elt, OutputReceiver> >> out) { >> buffer.append(out); >> if (buffer.size() > 100) { >> out.output(buffer); >> buffer = new ArrayList<>(); >> } >> } >> @FinishBundle public void finishBundle(OutputReceiver> out) { >> out.output(buffer); >> buffer = new ArrayList<>(); >> } >> } >> >> See >> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html >> for more information on the lifetime of DoFns. >> >> As for why your GBK is taking so long, yes, this can be a bottleneck. >> However, it should be noted that Dataflow (like most other runners) executes >> this step in conjunction with other steps as part of a "fused stage." So if >> your pipeline looks like >> >> Read -> DoFnA -> GBK -> DoFnB -> Write >> >> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up >> almost immediately), one element at at time, and when that's finished, >> GBK[part2, DoFnB, Write will execute concurrently, one element at a time, so >> you can't just look at the last unfinished stage to determine where the >> bottleneck is. (One helpful tool, however, is looking at the amount of time >> spent on each step in the UI.) >> >> Hopefully that helps. >> >> - Robert >> >> >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma >> wrote: >>> >>> Thanks Robert and Luke >>> >>> This approach seems good to me. I am trying that , i have to include a >>> GroupBy to make Iterable available to do ParDo function to do same. >>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only >>> 40 GB data (still waiting for rest of 100's of GB of data). >>> >>> Currently I used GroupByKey.Create() >>> >>> What's recommended way to use what key to make it execute faster like same >>> key for all rows, vs different key for each row vs same row for a group of >>> keys. >>> >>> Thanks >>> Aniruddh >>> >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik wrote: As Robert suggested, what prevents you from doing: ReadFromBQ -> ParDo(BatchInMemory) -> DLP where BatchInMemory stores elements in the @ProcessElement method in an in memory list and produce output every time the list is large enough with a final output in the @FinishBundle method? On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma wrote: > > Hi Luke > > Sorry forgot to mention the functions. Dataflow adds following function > and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super > slow, How to choose keys to make it faster ? > > .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>())) > .setCoder( > KvCoder.of( > keyCoder, > KvCoder.of(InstantCoder.of(), > WindowedValue.getFullCoder(kvCoder, windowCoder > > // Group by key and sort by timestamp, dropping windows as they > are reified > .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>()) > > // The GBKO sets the windowing strategy to the global default > .setWindowingStrategyInternal(inputWindowingStrategy); > > THanks > ANiruddh > > On 2020/04/23 16:35:58, Aniruddh Sharma wrote: > > Thanks Luke for your response. > > > > My use case is following. > > a) I read data from BQ (TableRow) > > b) Convert it into (Table.Row) for DLP calls. > > c) have to batch Table.Row collection up to a max size of 512 KB (i.e > > fit may rows from BQ into a single DLP table) and call DLP. > > > > Functionally, I don't have a need of key and window. As I just want to > > fit rows in DLP table up to a max size. > > > > In batch mode, when I call StateFulAPI, > > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" > > step and this step is super slow. Like it is running on 50 node cluster > > for 800 GB data for last 10 hours. > > > > This step is not added when I call Dataflow in streaming mode. But I > > can't call it in Streaming mode for other reasons. > > > > So I am trying to understand following > > a) Eith
Re: Apache Beam Python SDK - Inaccurate session window interval
Hi, I also deployed the application to Dataflow then got the same result. The actual session interval was not the same as the given session interval (60 minutes). --- new 2 record(s) in 1:50:00 session (start 2018-10-19 11:10:00 end 2018-10-19 13:00:00) new 2 record(s) in 1:01:00 session (start 2018-10-19 10:02:00 end 2018-10-19 11:03:00) new 5 record(s) in 3:00:00 session (start 2018-10-19 10:00:00 end 2018-10-19 13:00:00) --- https://stackoverflow.com/questions/61402895/apache-beam-python-sdk-inaccurate-session-window-interval On Fri, Apr 24, 2020 at 3:09 PM Yohei Onishi wrote: > Hi, > > I am trying to process data with 60 minutes session interval using Apache > Beam Python SDK. But the actual session interval was inaccurate such as > 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using > DirectRunner. > > Would you help me find a solution to fix this issue and process data with > 60 minutes session? > > I built my pipeline as bellow. > > - > with Pipeline(options=pipeline_options) as pipeline: > ( > pipeline > | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) > | "Convert" >> ParDo(Convert()) > | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, > get_timestamp_from_element(x).timestamp())) > | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) > | "Apply Session Window" >> > WindowInto(window.Sessions(known_args.session_interval)) > | "Group" >> GroupByKey() > | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) > ) > result = pipeline.run() > result.wait_until_finish() > - > > session_interval (60 minutes) is provided as bellow. > > - > parser.add_argument( > "--session_interval", > help="Interval of each session", > default=60*60) # 60 mins > - > > WriteToCSV function process data per session. I logged the session > duration but it was not accurate. > > - > class WriteToCSV(DoFn): > def __init__(self, output_path): > self.output_path = output_path > > def process(self, element, window=DoFn.WindowParam): > window_start = window.start.to_utc_datetime() > window_end = window.end.to_utc_datetime() > duration = window_end - window_start > logging.info(">>> new %s record(s) in %s session (start %s end > %s)", len(click_records), duration, window_start, window_end) > > - > > Then I got this log messages when I run this application locally with > DirectRunner. > > - > new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 > 05:00:00) > new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 > 03:03:00) > new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 > 05:00:00 > - > > Thanks. > > >
Apache Beam Python SDK - Inaccurate session window interval
Hi, I am trying to process data with 60 minutes session interval using Apache Beam Python SDK. But the actual session interval was inaccurate such as 3:00:00 or 1:01:00 or 1:50:00 when I run my application locally using DirectRunner. Would you help me find a solution to fix this issue and process data with 60 minutes session? I built my pipeline as bellow. - with Pipeline(options=pipeline_options) as pipeline: ( pipeline | "Read" >> ReadFromText(known_args.input, skip_header_lines=1) | "Convert" >> ParDo(Convert()) | "Add Timestamp" >> Map(lambda x: window.TimestampedValue(x, get_timestamp_from_element(x).timestamp())) | "Use User ID As Key" >> Map(lambda x: (x["user_id"], x)) | "Apply Session Window" >> WindowInto(window.Sessions(known_args.session_interval)) | "Group" >> GroupByKey() | "Write To CSV" >> ParDo(WriteToCSV(known_args.output)) ) result = pipeline.run() result.wait_until_finish() - session_interval (60 minutes) is provided as bellow. - parser.add_argument( "--session_interval", help="Interval of each session", default=60*60) # 60 mins - WriteToCSV function process data per session. I logged the session duration but it was not accurate. - class WriteToCSV(DoFn): def __init__(self, output_path): self.output_path = output_path def process(self, element, window=DoFn.WindowParam): window_start = window.start.to_utc_datetime() window_end = window.end.to_utc_datetime() duration = window_end - window_start logging.info(">>> new %s record(s) in %s session (start %s end %s)", len(click_records), duration, window_start, window_end) - Then I got this log messages when I run this application locally with DirectRunner. - new 5 records in 3:00:00 session (start 2018-10-19 02:00:00 end 2018-10-19 05:00:00) new 2 records in 1:01:00 session (start 2018-10-19 02:02:00 end 2018-10-19 03:03:00) new 2 records in 1:50:00 session (start 2018-10-19 03:10:00 end 2018-10-19 05:00:00 - Thanks.