Many people use the local Flink runner as their production runner. The direct runner doesn't have any "disaster" recovery options so you can only kill the process and rerun the pipeline. Also, the direct runner doesn't use any disk so as long as your working set for your pipeline fits in memory it will work well. You'll want to turn off some features[1, 2] that cause it to be slower since it performs extra validation to help with testing for users.
1: https://github.com/apache/beam/blob/4e47dea8283b5bf2d628bcd1642606816e78fd63/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L47 2: https://github.com/apache/beam/blob/4e47dea8283b5bf2d628bcd1642606816e78fd63/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L55 On Thu, May 14, 2020 at 8:47 PM Nathan Fisher <[email protected]> wrote: > Awesome thanks Luke! I mirrored a subset of the data (20k records) to a > local C* node and stripped out all of the windowing. It seems to be working > fine in batch mode. I think it was the confluence of my inexperience with > Beam and network latency that ultimately had me pulling out my hair. > > I started using the Direct Runner. Is there any guidance for what scale > you'd advise against using it? To be honest the local flink runner was so > easy I'm surprised it's not the default in the getting started > documentation. > > On Thu, May 14, 2020 at 1:31 PM Luke Cwik <[email protected]> wrote: > >> You could still use streaming mode but you need to ensure that you use a >> windowing strategy with a trigger that fires based upon processing time or >> # records and not event time. This wouldn't require buffering all the data >> as it is being read and would help with memory issues for runners that >> don't have access to effectively infinite memory or offload to disk. >> >> On Wed, May 13, 2020 at 5:04 PM Nathan Fisher <[email protected]> >> wrote: >> >>> Thanks Luke! >>> >>> When I started using the direct runner I was getting Out of Memory >>> errors. I incorrectly thought toggling on streaming would help >>> eliminate/minimise those errors. I started playing with windowing not >>> realising a bounded IO would treat everything as the same window even with >>> streaming on. >>> >>> I switched to the Flink runner after reading that the direct runner is >>> actually intended to tease out pipeline issues. I’ve been using the Flink >>> WebUI to monitor progress. Which has been helpful to visualise what >>> progress was made in the stream. >>> >>> What I would ultimately like to achieve is reading from Cassandra and >>> writing the records to one or more Parquet files based on either count (Eg >>> 10,000 records per file) and/or size on disk (Eg rotate to a new file when >>> 64MB is exceeded). >>> >>> The environment I’m testing against right now is a relatively small test >>> env with 100k records or so. Larger envs will be 1-100m. >>> >>> Regards, >>> Nathan >>> >>> On Wed, May 13, 2020 at 19:25, Luke Cwik <[email protected]> wrote: >>> >>>> Since CassandraIO is not an unbounded source, the watermark will never >>>> advance until all the data is read from Cassandraw which means that you >>>> will buffer all the data if you (or any transform you use) has any event >>>> time based windowing strategy when grouping. You could swap to use a >>>> processing time windowing strategy (via Window.into) but it is unclear that >>>> is what you want. >>>> >>>> CassandraIO is not an unbounded source so why do you want --streaming? >>>> Also, why do you want to window.into fixed windows of 30 seconds? >>>> >>>> >>>> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher <[email protected]> >>>> wrote: >>>> >>>>> Hi Folks, >>>>> >>>>> Cross-posting from the Slack channel from the other day. >>>>> >>>>> I started looking at Beam again over the weekend. I have an unbounded >>>>> stream with a CassandraIO input and am trying to write files using >>>>> FileIO and ParquetIO. >>>>> >>>>> I'm using the following: >>>>> >>>>> Beam: 2.20.0 >>>>> Flink Runner/Cluster: 1.9(.3) >>>>> >>>>> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true >>>>> --sdkWorkerParallelism=0 --runner=FlinkRunner >>>>> >>>>> When submitting to a Flink cluster I include >>>>> --flinkMaster=localhost:8081 in the command. >>>>> >>>>> If I replace the FileIO with a simple log writer it prints out the >>>>> records and makes progress. Using the FileIO with ParquetIO it stalls >>>>> on the stage >>>>> write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards >>>>> -> >>>>> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles) >>>>> -> write/WriteFiles/GatherTempFileResults/Add void >>>>> key/AddKeys/Map/ParMultiDo(Anonymous). >>>>> >>>>> That brings me to ask the following questions: >>>>> >>>>> 1. What's the best way to test and monitor a beam pipeline? >>>>> 2. What adjustments are required to get this pipeline writing >>>>> files? >>>>> 3. Is there some kind of way to evaluate the DAG and identify >>>>> scenarios where this stall is likely? >>>>> >>>>> PipelineOptions pipelineOptions = PipelineOptionsFactory >>>>> .fromArgs(args) >>>>> .withValidation() >>>>> .create(); >>>>> Pipeline p = Pipeline.create(pipelineOptions); >>>>> CoderRegistry registry = p.getCoderRegistry(); >>>>> registry.registerCoderForClass(GenericRecord.class, >>>>> AvroCoder.of(SCHEMA)); PCollection<Metric> metrics = >>>>> p.apply("cassandra", >>>>> CassandraIO.<Metric>read() >>>>> .withHosts(hosts) >>>>> .withPort(9042) >>>>> .withLocalDc("datacenter1") >>>>> .withKeyspace(KEY_SPACE) >>>>> .withTable(TABLE) >>>>> .withMinNumberOfSplits(100) >>>>> .withEntity(Metric.class) >>>>> .withCoder(SerializableCoder.of(Metric.class))); >>>>> metrics.apply("window", >>>>> Window.<Metric>into( >>>>> FixedWindows.of(Duration.standardSeconds(30))) >>>>> .withAllowedLateness(Duration.standardSeconds(5)) >>>>> .accumulatingFiredPanes()) >>>>> .apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG))) >>>>> .apply("write", FileIO.<GenericRecord>write() >>>>> .via(ParquetIO.sink(SCHEMA)) >>>>> .withNumShards(200) >>>>> .to("./metrics/") >>>>> .withPrefix("metrics") >>>>> .withSuffix(".parquet")); >>>>> p.run().waitUntilFinish(); >>>>> >>>>> I also loaded this into a Flink cluster and it appears to stall on the >>>>> temporary file sharding as outlined above and eventually fails after >>>>> processing about 600-700k records. >>>>> >>>>> Rereading the windowing section in the document I changed it to >>>>> discardFiredPanes() as it seems the more appropriate behaviour for what I >>>>> want but that doesn't appear to have changed the results any. >>>>> >>>>> Regards, >>>>> -- >>>>> Nathan Fisher >>>>> w: http://junctionbox.ca/ >>>>> >>>> -- >>> Nathan Fisher >>> w: http://junctionbox.ca/ >>> >> > > -- > Nathan Fisher > w: http://junctionbox.ca/ >
