Hi,
My pipeline is a GCS -> Dataflow -> BigQuery pipeline, processing 2GiB gzipped
JSON files (about 90 of them).
With the Dataflow SDK I was able to run it in streaming/batch mode on the
dataflow service for a couple of hours until it either run out of
memory or throw exceptions.
After moving to Apache Beam, the step in which I extract the timestamp
(ExtractTimestamps) – full code here [0] - and attach it to the element, does
not produce any output anymore.
Could it be that the implementation changed? This step is the one that enables
windowing, which should make it possible to write to
Partitions when running in streaming mode. According to the 0.5.0 code
“If invoked from ProcessElement}), the timestamp must not be older than the
input element's timestamp minus DoFn#getAllowedTimestampSkew. The output
element will be in the same windows as the input element.”
Could this be the problem?
The pipeline is based on the idea of Dan’s post on StackOverflow [1]. I would
be fine running it in streaming mode, but it looks like the input data is just
too much to handle as it crashed in streaming mode (or in batch, but batch is
experimental with partitions anyway).
I am running out of ideas at the moment. I would love to get an opinion about
either:
1. Reading all files, parsing and pushing them onto PubSub, then reading from
PubSub in streaming mode. Handle daily files in batch.
2. Built a giant loop that’s splits the files between different pipelines,
using something like a semaphore worker pool, processing them in two phases. In
the first, extract and parse and write and AVRO file, as well as a key-value
map between dates and filenames. In phase 2, read the AVRO files, while knowing
the total number of days then and partition them accordingly to load into BQ.
Tobi
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new ReadObjects()))
.apply("Extract timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Window into days",
Window.into(FixedWindows.of(Duration.standardDays(1))))
.apply("Format Output", ParDo.of(new Outputter()))
.apply("Write to BQ", BigQueryIO.Write
.to(new DayPartitionFunc("dataset", tableName))
.withSchema(Outputter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
[0]
https://gist.github.com/james-woods/2e8e91adf35faddce3814128852896ea#file-pipeline-java-L105
[1]
http://stackoverflow.com/questions/38114306/creating-writing-to-parititoned-bigquery-table-via-google-cloud-dataflow?rq=1