Since CassandraIO is not an unbounded source, the watermark will never advance until all the data is read from Cassandraw which means that you will buffer all the data if you (or any transform you use) has any event time based windowing strategy when grouping. You could swap to use a processing time windowing strategy (via Window.into) but it is unclear that is what you want.
CassandraIO is not an unbounded source so why do you want --streaming? Also, why do you want to window.into fixed windows of 30 seconds? On Tue, May 12, 2020 at 8:28 AM Nathan Fisher <[email protected]> wrote: > Hi Folks, > > Cross-posting from the Slack channel from the other day. > > I started looking at Beam again over the weekend. I have an unbounded > stream with a CassandraIO input and am trying to write files using FileIO > and ParquetIO. > > I'm using the following: > > Beam: 2.20.0 > Flink Runner/Cluster: 1.9(.3) > > java -Xmx12g -jar target/fmetrics-1.0-SNAPSHOT.jar --streaming=true > --sdkWorkerParallelism=0 --runner=FlinkRunner > > When submitting to a Flink cluster I include --flinkMaster=localhost:8081 > in the command. > > If I replace the FileIO with a simple log writer it prints out the records > and makes progress. Using the FileIO with ParquetIO it stalls on the > stage write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards -> > write/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles/ParMultiDo(WriteShardsIntoTempFiles) > -> write/WriteFiles/GatherTempFileResults/Add void > key/AddKeys/Map/ParMultiDo(Anonymous). > > That brings me to ask the following questions: > > 1. What's the best way to test and monitor a beam pipeline? > 2. What adjustments are required to get this pipeline writing files? > 3. Is there some kind of way to evaluate the DAG and identify > scenarios where this stall is likely? > > PipelineOptions pipelineOptions = PipelineOptionsFactory > .fromArgs(args) > .withValidation() > .create(); > Pipeline p = Pipeline.create(pipelineOptions); > CoderRegistry registry = p.getCoderRegistry(); > registry.registerCoderForClass(GenericRecord.class, > AvroCoder.of(SCHEMA)); PCollection<Metric> metrics = p.apply("cassandra", > CassandraIO.<Metric>read() > .withHosts(hosts) > .withPort(9042) > .withLocalDc("datacenter1") > .withKeyspace(KEY_SPACE) > .withTable(TABLE) > .withMinNumberOfSplits(100) > .withEntity(Metric.class) > .withCoder(SerializableCoder.of(Metric.class))); > metrics.apply("window", > Window.<Metric>into( > FixedWindows.of(Duration.standardSeconds(30))) > .withAllowedLateness(Duration.standardSeconds(5)) > .accumulatingFiredPanes()) > .apply("metricToGeneric", ParDo.of(new MetricToGeneric(LOG))) > .apply("write", FileIO.<GenericRecord>write() > .via(ParquetIO.sink(SCHEMA)) > .withNumShards(200) > .to("./metrics/") > .withPrefix("metrics") > .withSuffix(".parquet")); > p.run().waitUntilFinish(); > > I also loaded this into a Flink cluster and it appears to stall on the > temporary file sharding as outlined above and eventually fails after > processing about 600-700k records. > > Rereading the windowing section in the document I changed it to > discardFiredPanes() as it seems the more appropriate behaviour for what I > want but that doesn't appear to have changed the results any. > > Regards, > -- > Nathan Fisher > w: http://junctionbox.ca/ >
