Re: Using watermarks with bounded sources
Hi! The PR just got submitted. You can play with SDF in Dataflow streaming runner now :) Hope it doesn't get rolled back (fingers crossed)... On Mon, Jun 19, 2017 at 6:06 PM Eugene Kirpichov wrote: > Hi, > The PR is ready and I'm just struggling with setup of tests - Dataflow > ValidatesRunner tests currently don't have a streaming execution. > I think +Kenn Knowles was doing something about that, or > I might find a workaround. > > But basically if you want to experiment - if you patch in the PR, you can > experiment with SDF in Dataflow in streaming mode. It passes tests against > the current production Dataflow Service. > > > On Thu, Jun 15, 2017 at 8:54 AM peay wrote: > >> Eugene, would you have an ETA on when splittable DoFn would be available >> in Dataflow in batch/streaming mode? I see that >> https://github.com/apache/beam/pull/1898 is still active >> >> I've started to experiment with those using the DirectRunner and this is >> a great API. >> >> Thanks! >> >> Original Message >> Subject: Re: Using watermarks with bounded sources >> >> Local Time: April 23, 2017 10:18 AM >> UTC Time: April 23, 2017 2:18 PM >> From: p...@protonmail.com >> To: Eugene Kirpichov >> user@beam.apache.org >> >> Ah, I didn't know about that. This is *really* great -- from a quick >> look, the API looks both very natural and very powerful. Thanks a lot for >> getting this into Beam! >> >> I see Flink support seems to have been merged already. Any idea on when >> https://github.com/apache/beam/pull/1898 will get merged? >> >> I see updateWatermark in the API but not in the proposal's examples which >> only uses resume/withFutureOutputWatermark. Any reason why updateWatermark >> is not called after each output in the examples from the proposal? I guess >> that would be "too fined-grained" to update it for each individual record >> of a mini-batch? >> >> In my case with existing hourly files, would `outputElement(01:00 file), >> updateWatermark(01:00), outputElement(02:00), updateWatermark(02:00), ...` >> be the proper way to output per-hour elements while gradually moving the >> watermark forward while going through an existing list? Or would you >> instead suggest to still use resume (potentially with were small timeouts)? >> >> Thanks, >> >> Original Message >> Subject: Re: Using watermarks with bounded sources >> Local Time: 22 April 2017 3:59 PM >> UTC Time: 22 April 2017 19:59 >> From: kirpic...@google.com >> To: peay , user@beam.apache.org < >> user@beam.apache.org> >> >> Hi! This is an excellent question; don't have time to reply in much >> detail right now, but please take a look at >> http://s.apache.org/splittable-do-fn - it unifies the concepts of >> bounded and unbounded sources, and the use case you mentioned is one of the >> motivating examples. >> >> Also, see recent discussions on pipeline termination semantics: >> technically nothing should prevent an unbounded source from saying it's >> done "for real" (no new data will appear), just the current UnboundedSource >> API does not expose such a method. (but Splittable DoFn does) >> >> On Sat, Apr 22, 2017 at 11:15 AM peay wrote: >> >>> Hello, >>> >>> A use case I find myself running into frequently is the following: I >>> have daily or hourly files, and a Beam pipeline with a small to moderate >>> size windows. (Actually, I've just seen that support for per-window files >>> support in file based sinks was recently checked in, which is one way to >>> get there). >>> >>> Now, Beam has no clue about the fact that each file corresponds to a >>> given time interval. My understanding is that when running the pipeline in >>> batch mode with a bounded source, there is no notion watermark and we have >>> to load everything because we just don't know. This is pretty wasteful, >>> especially as you have to keep a lot of data in memory, while you could in >>> principle operate close to what you'd do in streaming mode: first read the >>> oldest files, then newest files, moving the watermark forward as you go >>> through the input list of files. >>> >>> I see one way around this. Let's say that I have hourly files and let's >>> not assume anything about the order of records within the file to keep it >>> simple: I don't want a very precise record-level watermark, but more a >>> rough watermark at the granularity of hours. Say we can easily get the >>> corresponding time interval from the filename. One can make an unbounded >>> source that essentially acts as a "List of bounded file-based sources". If >>> there are K splits, split k can read every file that has `index % K == k` >>> in the time-ordered list of files. `advance` can advance the current file, >>> and move on to the next one if no records were read. >>> >>> However, as far as I understand, this pipeline will never terminate >>> since this is an unbounded source and having the `advance` method of our >>> wrapping source return `false` won't make the pipeline terminate
Re: Reading encrypted and lz compressed files
You'll want to extend FileBasedSource and pass in the fact that the file is not splittable and that the compression type is CompressionTypes.UNCOMPRESSED https://github.com/apache/beam/blob/59598d8f41e65f9a068d7446457395e112dc3bc7/sdks/python/apache_beam/io/filebasedsource.py You'll want to overload the "open_file" method to wrap the stream being returned so that the decompression/decryption occurs. On Tue, Jun 20, 2017 at 11:12 AM, Sachin Shetty wrote: > Thankyou Lukasz for the link, I will try to build my custom source on the > same lines. > > Any pointers on how we can do this in python? > > > > On Tue, Jun 20, 2017 at 8:29 PM, Lukasz Cwik wrote: > >> Take a look at CompressedSource: >> https://github.com/apache/beam/blob/master/sdks/java/core/ >> src/main/java/org/apache/beam/sdk/io/CompressedSource.java >> >> I feel as though you could follow the same pattern to decompress/decrypt >> the data as a wrapper. >> >> Apache Beam supports a concept of dynamic work rebalancing ( >> https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html) >> which allows a runner to utilize its fleet of machines more efficiently. >> For file based sources, this would rely on being able to partition each >> input file into smaller pieces for parallel processing. Unless that >> compression/encryption method supports efficient seeking within the >> uncompressed data, the smallest granularity of work rebalancing you will >> have is at the whole file level (since decompressing/decrypting from the >> beginning of the file to read an arbitrary offset is usually very >> inefficient). >> >> On Tue, Jun 20, 2017 at 6:00 AM, Sachin Shetty >> wrote: >> >>> Hi, >>> >>> I am trying to process some of our access logs using beam. Log files are >>> archived to a GCS bucket, but they are lz compressed and encrypted using >>> gpg. >>> >>> Any idea how I could load up the files in to a pipeline without >>> decrypting, decompressing and staging the file before feeding it to a beam >>> pipeline? >>> >>> I see that I could write a custom coder, but I could not figure our the >>> specifics. >>> >>> Thanks >>> Sachin >>> >> >> >
Re: Reading encrypted and lz compressed files
Thankyou Lukasz for the link, I will try to build my custom source on the same lines. Any pointers on how we can do this in python? On Tue, Jun 20, 2017 at 8:29 PM, Lukasz Cwik wrote: > Take a look at CompressedSource: > https://github.com/apache/beam/blob/master/sdks/java/ > core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java > > I feel as though you could follow the same pattern to decompress/decrypt > the data as a wrapper. > > Apache Beam supports a concept of dynamic work rebalancing ( > https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html) > which allows a runner to utilize its fleet of machines more efficiently. > For file based sources, this would rely on being able to partition each > input file into smaller pieces for parallel processing. Unless that > compression/encryption method supports efficient seeking within the > uncompressed data, the smallest granularity of work rebalancing you will > have is at the whole file level (since decompressing/decrypting from the > beginning of the file to read an arbitrary offset is usually very > inefficient). > > On Tue, Jun 20, 2017 at 6:00 AM, Sachin Shetty > wrote: > >> Hi, >> >> I am trying to process some of our access logs using beam. Log files are >> archived to a GCS bucket, but they are lz compressed and encrypted using >> gpg. >> >> Any idea how I could load up the files in to a pipeline without >> decrypting, decompressing and staging the file before feeding it to a beam >> pipeline? >> >> I see that I could write a custom coder, but I could not figure our the >> specifics. >> >> Thanks >> Sachin >> > >
Re: Is this a valid usecase for Apache Beam and Google dataflow
Take a look at session windows[1]. As long as the messages you post to Pubsub aren't spaced out farther then the session gap duration they will all get grouped together. It seems as though it would be much simpler to just run a separate Apache Beam job for each internal job you want to process since you won't have to deal with potentially late data exceeding the session gap duration. 1: https://cloud.google.com/dataflow/model/windowing#session-windows On Tue, Jun 20, 2017 at 9:10 AM, Randal Moore wrote: > Just starting looking at Beam this week as a candidate for executing some > fairly CPU intensive work. I am curious if the stream-oriented features of > Beam are a match for my usecase. My user will submit a large number of > computations to the system (as a "job"). Those computations can be > expressed in a series of DoFn whose results can be stored (in for example > google datastore). > > My initial idea was to post the individual messages (anywhere from 1000 to > 1,000,000 per job) to a google pub/sub topic that is watched by a google > dataflow job. I can write a prototype that performs the appropriate > transformation and posts the results. > > But... > > I cannot find any way to capture the notion of the completion of the > original "user job". I want to post to pub/sub that all individual > calculations are complete. I was hoping that I could write a CombineFn that > would be able to post a message as each job finishes but Combine needs a > window and I don't see how to define it. > > Is there a way to define a window that is defined by the user's job - I > know exactly which individual computations are part of the user's job (and > exactly how many). But all the grouping that I've discovered so far seems > to be well defined at compile time (e.g., number of messages in the window, > or number of partitions). > > Is this the wrong usecase for dataflow/beam? Is there a better way to > express this problem? > > Thanks, > rdm >
Re: What state is buffered when using Combine.perKey with an accumulator?
Hi Kenn, Thanks for the reply, that makes sense. As far as I can tell, the DirectPipelineRunner doesn't do this optimisation (when I test the pipeline locally) but I guess the DataflowRunner will. Josh On Tue, Jun 20, 2017 at 4:26 PM, Kenneth Knowles wrote: > Hi Josh, > > Exactly what is stored technically depends on optimization decisions by > the runner. But you can generally expect that only the accumulator is > stored across trigger firings, not the input elements. > > Kenn > > On Tue, Jun 20, 2017 at 6:32 AM, Josh wrote: > >> Hi all, >> >> I have a question about how much state is buffered when using >> Combine.perKey with a custom accumulator. For example, I have: >> >> PCollection> elements = ...; >> >> PCollection> topValuesPerKey = elements >> >> .apply(Window.into(new GlobalWindows()) >> >> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirst >> ElementInPane() >> >> .plusDelayOf(Duration.standardSeconds(10 >> >> .accumulatingFiredPanes()) >> >> .apply(Combine.perKey(new MyCombineFunction())); >> >> >> Here MyCombineFunction is for each key, counting the occurrences of each >> value. It's output for each key is a List of the values that occur >> most frequently. In this case the accumulator for each key just stores a >> Map of values and their associated counts. >> >> >> My question is - since I am accumulatingFiredPanes forever on the global >> window - is every element going to be buffered forever (i.e. amount of >> space needed will constantly increase)? Or, is the amount of state buffered >> determined by my accumulator (i.e. determined by the number of unique >> values across all keys)? If the former is the case, how can I optimise my >> job so that the accumulator is the only state stored across panes? >> >> >> Thanks for any advice, >> >> Josh >> > >
Is this a valid usecase for Apache Beam and Google dataflow
Just starting looking at Beam this week as a candidate for executing some fairly CPU intensive work. I am curious if the stream-oriented features of Beam are a match for my usecase. My user will submit a large number of computations to the system (as a "job"). Those computations can be expressed in a series of DoFn whose results can be stored (in for example google datastore). My initial idea was to post the individual messages (anywhere from 1000 to 1,000,000 per job) to a google pub/sub topic that is watched by a google dataflow job. I can write a prototype that performs the appropriate transformation and posts the results. But... I cannot find any way to capture the notion of the completion of the original "user job". I want to post to pub/sub that all individual calculations are complete. I was hoping that I could write a CombineFn that would be able to post a message as each job finishes but Combine needs a window and I don't see how to define it. Is there a way to define a window that is defined by the user's job - I know exactly which individual computations are part of the user's job (and exactly how many). But all the grouping that I've discovered so far seems to be well defined at compile time (e.g., number of messages in the window, or number of partitions). Is this the wrong usecase for dataflow/beam? Is there a better way to express this problem? Thanks, rdm
Re: What state is buffered when using Combine.perKey with an accumulator?
Hi Josh, Exactly what is stored technically depends on optimization decisions by the runner. But you can generally expect that only the accumulator is stored across trigger firings, not the input elements. Kenn On Tue, Jun 20, 2017 at 6:32 AM, Josh wrote: > Hi all, > > I have a question about how much state is buffered when using > Combine.perKey with a custom accumulator. For example, I have: > > PCollection> elements = ...; > > PCollection> topValuesPerKey = elements > > .apply(Window.into(new GlobalWindows()) > > .triggering(Repeatedly.forever(AfterProcessingTime. > pastFirstElementInPane() > > .plusDelayOf(Duration.standardSeconds(10 > > .accumulatingFiredPanes()) > > .apply(Combine.perKey(new MyCombineFunction())); > > > Here MyCombineFunction is for each key, counting the occurrences of each > value. It's output for each key is a List of the values that occur > most frequently. In this case the accumulator for each key just stores a > Map of values and their associated counts. > > > My question is - since I am accumulatingFiredPanes forever on the global > window - is every element going to be buffered forever (i.e. amount of > space needed will constantly increase)? Or, is the amount of state buffered > determined by my accumulator (i.e. determined by the number of unique > values across all keys)? If the former is the case, how can I optimise my > job so that the accumulator is the only state stored across panes? > > > Thanks for any advice, > > Josh >
Re: Reading encrypted and lz compressed files
Take a look at CompressedSource: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java I feel as though you could follow the same pattern to decompress/decrypt the data as a wrapper. Apache Beam supports a concept of dynamic work rebalancing ( https://beam.apache.org/blog/2016/05/18/splitAtFraction-method.html) which allows a runner to utilize its fleet of machines more efficiently. For file based sources, this would rely on being able to partition each input file into smaller pieces for parallel processing. Unless that compression/encryption method supports efficient seeking within the uncompressed data, the smallest granularity of work rebalancing you will have is at the whole file level (since decompressing/decrypting from the beginning of the file to read an arbitrary offset is usually very inefficient). On Tue, Jun 20, 2017 at 6:00 AM, Sachin Shetty wrote: > Hi, > > I am trying to process some of our access logs using beam. Log files are > archived to a GCS bucket, but they are lz compressed and encrypted using > gpg. > > Any idea how I could load up the files in to a pipeline without > decrypting, decompressing and staging the file before feeding it to a beam > pipeline? > > I see that I could write a custom coder, but I could not figure our the > specifics. > > Thanks > Sachin >
Re: AppEngine & beam
Have you tried an AppEngine flex environment? I know that users have tried AppEngine standard with the Java SDK and have hit limitations of the standard environment which are not easy to resolve. The solution has always been to suggest users try the flex environment ( https://cloud.google.com/appengine/docs/flexible/). On Tue, Jun 20, 2017 at 6:31 AM, Tolsa, Camille < camille.tolsa-...@veolia.com> wrote: > Hi team, > > I would like to run beam pipelines from an AppEngine service. I tried to > run them in a standard environment and I'm facing issues beam related. > > This line crashes > import apache_beam as beam > > Trace (file: dill/dill.py:68): > ImportError: Cannot re-init internal module __main__ > > I wonder if it can be use in a standard environment? > Do you have any clues about this ? > > > > > This e-mail transmission (message and any attached files) may contain > information that is proprietary, privileged and/or confidential to Veolia > Environnement and/or its affiliates and is intended exclusively for the > person(s) to whom it is addressed. If you are not the intended recipient, > please notify the sender by return e-mail and delete all copies of this > e-mail, including all attachments. Unless expressly authorized, any use, > disclosure, publication, retransmission or dissemination of this e-mail > and/or of its attachments is strictly prohibited. > > Ce message electronique et ses fichiers attaches sont strictement > confidentiels et peuvent contenir des elements dont Veolia Environnement > et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc > destines a l'usage de leurs seuls destinataires. Si vous avez recu ce > message par erreur, merci de le retourner a son emetteur et de le detruire > ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la > publication, la distribution, ou la reproduction non expressement > autorisees de ce message et de ses pieces attachees sont interdites. > > >
Re: WordCount "akka" "Disassociated" Error on Beam on long-running Flink YARN-cluster / long-running Local Flink Cluster
The solution (in this case) was to swap the Flink binary for the compatible Scala version. Peering deeper into the Flink JobManager logs I found: 2017-06-19 14:07:53,459 ERROR Remoting - scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114498752079829388 java.io.InvalidClassException: scala.Option; local class incompatible: stream classdesc serialVersionUID = -2062608324514658839, local class serialVersionUID = -114498752079829388 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616) ... This led me to try the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.10, rather than the binary for Flink 1.2.1 + Hadoop 2.6 + Scala 2.11 I used originally. (https://flink.apache.org/downloads.html) This switch solved the problem (or, at least, I didn't have the problem after the switch). This actually should have been evident to me through the mvn dependency:tree step on the FlinkRunner guide. ( https://beam.apache.org/documentation/runners/flink/) The relevant line from that step says: [INFO] | +- org.apache.flink:flink-streaming-java_2.10:jar:1.1.2:runtime And the "2.10" is what tips us off to the need for Flink version with Scala 2.10 instead of Scala 2.11 (I think). If so, that should be mentioned more explicitly on the FlinkRunner documentation page. Cheers! On Fri, Jun 16, 2017 at 4:44 AM, Aljoscha Krettek wrote: > Hi Chris, > > I just followed your process myself (getting Flink 1.2.1, starting in > local cluster mode, running Beam word-count Quickstart on cluster) and > everything worked for me. Could you double check whether the JobManager is > reachable under the expected address? > > On another note, you can also run Beam jobs on Flink with the usual > bin/flink tool, i.e. to submit as a one-job YARN session or to submit to a > running YARN Flink cluster: > bin/flink run -c main-class path/to/jar.jar > > Where would be exactly the same arguments that you > used before. > > Best, > Aljoscha > > > On 15. Jun 2017, at 17:44, Chris Hebert digitalreasoning.com> wrote: > > > > Hi, > > > > The error is pasted below my procedure. > > > > > > ### My Procedure for Beam on "a long-running Local Flink Cluster": > > > > Beam WordCount Quickstart: https://beam.apache.org/get- > started/quickstart-java/ > > > > Run: > > > cd /user/me/beam > > > mvn archetype:generate \ > > > -DarchetypeGroupId=org.apache.beam \ > > > -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ > > > -DarchetypeVersion=2.0.0 \ > > > -DgroupId=org.example \ > > > -DartifactId=word-count-beam \ > > > -Dversion="0.1" \ > > > -Dpackage=org.apache.beam.examples \ > > > -DinteractiveMode=false > > > > Beam on FlinkRunner Guide: https://beam.apache.org/ > documentation/runners/flink/ > > > > Navigate into word-count-beam and identify the appropriate Flink version > to be 1.2.1: > > > cd word-count-beam > > > mvn dependency:tree -Pflink-runner | grep flink > > > > Local Flink Cluster Quickstart Guide: https://ci.apache.org/ > projects/flink/flink-docs-release-1.2/quickstart/setup_quickstart.html > > > > Follow the Local Flink Cluster Quickstart Guide. The "Apache Flink Web > Dashboard" opens in a browser showing jobs successfully running and > completed as I submit them. I keep this running. > > > > Back in /user/me/beam/word-count-beam/, run: > > > mvn package exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount > \ > > > -Dexec.args=" \ > > > --runner=FlinkRunner \ > > > --flinkMaster=localhost:6123 \ > > > --filesToStage=target/word-count-beam-0.1.jar \ > > > --inputFile=/user/me/beam/word-count-beam/pom.xml \ > > > --output=/user/me/beam/word-count-beam/output_01" \ > > > -Pflink-runner > > > > The flinkMaster host:port is identified in the JobManager tab of the > Apache Flink Web Dashboard. Note that the Beam guide says to use > "--filesToStage=target/word-count-beam-bundled-0.1.jar", but Maven > actually only builds "target/word-count-beam-0.1.jar". > > > > The above command runs until it reaches the errors pasted below. The job > never makes it onto the Apache Flink Web Dashboard, and no output is > produced. > > > > Note that the following command (under the "Flink-local" tab on the Beam > Quickstart Guide) works fine, but it starts it's own instance of a Local > Flink Cluster. The job never makes it onto the Apache Flink Web Dashboard > of my long-standing Local Flink Cluster I set up above. This makes sense, > because it doesn't use "-m" to connect to the long-running JobManager. > > > mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount > \ > > > -Dexec.args=" \ > > > --runner=FlinkRunner \ > > > --inputFile=pom.xml \ > > > --output=counts" \ > > > -Pflink-runner > > > > > > ### My Procedure for Beam on "a long-running Flink Cluster on YARN": > > > > Flink on YARN Setup: https://c
What state is buffered when using Combine.perKey with an accumulator?
Hi all, I have a question about how much state is buffered when using Combine.perKey with a custom accumulator. For example, I have: PCollection> elements = ...; PCollection> topValuesPerKey = elements .apply(Window.into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10 .accumulatingFiredPanes()) .apply(Combine.perKey(new MyCombineFunction())); Here MyCombineFunction is for each key, counting the occurrences of each value. It's output for each key is a List of the values that occur most frequently. In this case the accumulator for each key just stores a Map of values and their associated counts. My question is - since I am accumulatingFiredPanes forever on the global window - is every element going to be buffered forever (i.e. amount of space needed will constantly increase)? Or, is the amount of state buffered determined by my accumulator (i.e. determined by the number of unique values across all keys)? If the former is the case, how can I optimise my job so that the accumulator is the only state stored across panes? Thanks for any advice, Josh
AppEngine & beam
Hi team, I would like to run beam pipelines from an AppEngine service. I tried to run them in a standard environment and I'm facing issues beam related. This line crashes import apache_beam as beam Trace (file: dill/dill.py:68): ImportError: Cannot re-init internal module __main__ I wonder if it can be use in a standard environment? Do you have any clues about this ? -- This e-mail transmission (message and any attached files) may contain information that is proprietary, privileged and/or confidential to Veolia Environnement and/or its affiliates and is intended exclusively for the person(s) to whom it is addressed. If you are not the intended recipient, please notify the sender by return e-mail and delete all copies of this e-mail, including all attachments. Unless expressly authorized, any use, disclosure, publication, retransmission or dissemination of this e-mail and/or of its attachments is strictly prohibited. Ce message electronique et ses fichiers attaches sont strictement confidentiels et peuvent contenir des elements dont Veolia Environnement et/ou l'une de ses entites affiliees sont proprietaires. Ils sont donc destines a l'usage de leurs seuls destinataires. Si vous avez recu ce message par erreur, merci de le retourner a son emetteur et de le detruire ainsi que toutes les pieces attachees. L'utilisation, la divulgation, la publication, la distribution, ou la reproduction non expressement autorisees de ce message et de ses pieces attachees sont interdites.
Reading encrypted and lz compressed files
Hi, I am trying to process some of our access logs using beam. Log files are archived to a GCS bucket, but they are lz compressed and encrypted using gpg. Any idea how I could load up the files in to a pipeline without decrypting, decompressing and staging the file before feeding it to a beam pipeline? I see that I could write a custom coder, but I could not figure our the specifics. Thanks Sachin