Thanks Luke!

When I started using the direct runner I was getting Out of Memory errors.
I incorrectly thought toggling on streaming would help eliminate/minimise
those errors. I started playing with windowing not realising a bounded IO
would treat everything as the same window even with streaming on.

I switched to the Flink runner after reading that the direct runner is
actually intended to tease out pipeline issues. I’ve been using the Flink
WebUI to monitor progress. Which has been helpful to visualise what
progress was made in the stream.

What I would ultimately like to achieve is reading from Cassandra and
writing the records to one or more Parquet files based on either count (Eg
10,000 records per file) and/or size on disk (Eg rotate to a new file when
64MB is exceeded).

The environment I’m testing against right now is a relatively small test
env with 100k records or so. Larger envs will be 1-100m.

Regards,
Nathan

On Wed, May 13, 2020 at 19:25, Luke Cwik <[email protected]> wrote:

> Since CassandraIO is not an unbounded source, the watermark will never
> advance until all the data is read from Cassandraw which means that you
> will buffer all the data if you (or any transform you use) has any event
> time based windowing strategy when grouping. You could swap to use a
> processing time windowing strategy (via Window.into) but it is unclear that
> is what you want.
>
> CassandraIO is not an unbounded source so why do you want --streaming?
> Also, why do you want to window.into fixed windows of 30 seconds?
>
>
> On Tue, May 12, 2020 at 8:28 AM Nathan Fisher <[email protected]>
> wrote:
>
>> Hi Folks,
>>
>> Cross-posting from the Slack channel from the other day.
>>
>> I started looking at Beam again over the weekend. I have an unbounded
>> stream with a CassandraIO input and am trying to write files using FileIO
>> and ParquetIO.
>>
>> I'm using the following:
>>
>> Beam: 2.20.0
>> Flink Runner/Cluster: 1.9(.3)
>>
>> java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true
>> --sdkWorkerParallelism=0 --runner=FlinkRunner
>>
>> When submitting to a Flink cluster I include --flinkMaster=localhost:8081
>> in the command.
>>
>> If I replace the FileIO with a simple log writer it prints out the
>> records and makes progress. Using the FileIO with ParquetIO it stalls on
>> the stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
>> ->
>> write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles)
>> -> write/WriteFiles/GatherTempFileResults/Add void
>> key/AddKeys/Map/ParMultiDo(Anonymous).
>>
>> That brings me to ask the following questions:
>>
>>    1. What's the best way to test and monitor a beam pipeline?
>>    2. What adjustments are required to get this pipeline writing files?
>>    3. Is there some kind of way to evaluate the DAG and identify
>>    scenarios where this stall is likely?
>>
>>    PipelineOptions pipelineOptions = PipelineOptionsFactory
>>         .fromArgs(args)
>>         .withValidation()
>>         .create();
>>     Pipeline p = Pipeline.create(pipelineOptions);
>>     CoderRegistry registry = p.getCoderRegistry();
>>     registry.registerCoderForClass(GenericRecord.class, 
>> AvroCoder.of(SCHEMA));    PCollection<Metric> metrics = p.apply("cassandra",
>>         CassandraIO.<Metric>read()
>>             .withHosts(hosts)
>>             .withPort(9042)
>>             .withLocalDc("datacenter1")
>>             .withKeyspace(KEY_SPACE)
>>             .withTable(TABLE)
>>             .withMinNumberOfSplits(100)
>>             .withEntity(Metric.class)
>>             .withCoder(SerializableCoder.of(Metric.class)));
>>     metrics.apply("window",
>>         Window.<Metric>into(
>>             FixedWindows.of(Duration.standardSeconds(30)))
>>             .withAllowedLateness(Duration.standardSeconds(5))
>>             .accumulatingFiredPanes())
>>         .apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG)))
>>         .apply("write", FileIO.<GenericRecord>write()
>>             .via(ParquetIO.sink(SCHEMA))
>>             .withNumShards(200)
>>             .to("./metrics/")
>>             .withPrefix("metrics")
>>             .withSuffix(".parquet"));
>>     p.run().waitUntilFinish();
>>
>> I also loaded this into a Flink cluster and it appears to stall on the
>> temporary file sharding as outlined above and eventually fails after
>> processing about 600-700k records.
>>
>> Rereading the windowing section in the document I changed it to
>> discardFiredPanes() as it seems the more appropriate behaviour for what I
>> want but that doesn't appear to have changed the results any.
>>
>> Regards,
>> --
>> Nathan Fisher
>>  w: http://junctionbox.ca/
>>
> --
Nathan Fisher
 w: http://junctionbox.ca/

Reply via email to