What are you setting numShards to? This is likely to be the main bottleneck
on your pipeline.

On Fri, Mar 13, 2020 at 9:38 AM Anand Singh Kunwar <anandkunwa...@gmail.com>
wrote:

> Hi beam devs,
>
> Context:
> I have been experimenting with generating columnar data from prometheus
> metric data to write to Google Cloud Storage. My pipeline takes input of
> Prometheus Remote Write HTTP payload from kafka(this is compressed in
> snappy and protobuf encoded), my first 2 steps of the pipeline do the
> uncompression and decoding and make a metric object. I window this input to
> fixed windows of 1 minute and write the window to GCS in ORC format. I have
> been seeing huge lag in my pipeline.
>
> Problem/Bug:
> The custom FileIO.write().sink implementation for ORCIO writes to GCS
> using the ORC library. In my sink implementation I even implemented all
> operations as no-ops, even then I saw a huge lag in my pipeline. When I
> comment out the FileIO transformation(that is acutally a no-op), my
> pipeline keeps up with the input load.
> Looking up online my problem seems to relate to this
> https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets
> .
>
> This is what my code looks like:
>
> p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers(
>         "mykafka:9092")
>         .withTopic(options.getInputTopic())
>
> .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG,
> "custom-id",
>                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"))
>         .withKeyDeserializer(LongDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata())
>         .apply("UncompressSnappy", ParDo.of(new UncompressSnappy()))
>         .apply("DecodeProto", ParDo.of(new DecodePromProto()))
>         .apply("MapTSSample", ParDo.of(new MapTSSample()))
>
> .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1)))
>                 .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
>         .apply(new WriteOneFilePerWindow(options.getOutput(), 1, ".orc"));
>
> This is what WriteOneFilePerWindow.java's expand looks like for me:
>
> public PDone expand(PCollection<TSSample> input) {
>     input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new
> MyFileNaming(filenameSuffix))
>         .withNumShards(numShards).via(ORCIO.sink()));
>     return PDone.in(input.getPipeline());
> }
>
>
> Best
> Anand Singh Kunwar
>

Reply via email to