What are you setting numShards to? This is likely to be the main bottleneck on your pipeline.
On Fri, Mar 13, 2020 at 9:38 AM Anand Singh Kunwar <anandkunwa...@gmail.com> wrote: > Hi beam devs, > > Context: > I have been experimenting with generating columnar data from prometheus > metric data to write to Google Cloud Storage. My pipeline takes input of > Prometheus Remote Write HTTP payload from kafka(this is compressed in > snappy and protobuf encoded), my first 2 steps of the pipeline do the > uncompression and decoding and make a metric object. I window this input to > fixed windows of 1 minute and write the window to GCS in ORC format. I have > been seeing huge lag in my pipeline. > > Problem/Bug: > The custom FileIO.write().sink implementation for ORCIO writes to GCS > using the ORC library. In my sink implementation I even implemented all > operations as no-ops, even then I saw a huge lag in my pipeline. When I > comment out the FileIO transformation(that is acutally a no-op), my > pipeline keeps up with the input load. > Looking up online my problem seems to relate to this > https://stackoverflow.com/questions/54094102/beam-pipeline-kafka-to-hdfs-by-time-buckets > . > > This is what my code looks like: > > p.apply("ReadLines", KafkaIO.<Long, byte[]>read().withBootstrapServers( > "mykafka:9092") > .withTopic(options.getInputTopic()) > > .withConsumerConfigUpdates(ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, > "custom-id", > ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")) > .withKeyDeserializer(LongDeserializer.class) > > .withValueDeserializer(ByteArrayDeserializer.class).withoutMetadata()) > .apply("UncompressSnappy", ParDo.of(new UncompressSnappy())) > .apply("DecodeProto", ParDo.of(new DecodePromProto())) > .apply("MapTSSample", ParDo.of(new MapTSSample())) > > .apply(Window.<TSSample>into(FixedWindows.of(Duration.standardMinutes(1))) > .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) > .apply(new WriteOneFilePerWindow(options.getOutput(), 1, ".orc")); > > This is what WriteOneFilePerWindow.java's expand looks like for me: > > public PDone expand(PCollection<TSSample> input) { > input.apply(FileIO.<TSSample>write().to(filenamePrefix).withNaming(new > MyFileNaming(filenameSuffix)) > .withNumShards(numShards).via(ORCIO.sink())); > return PDone.in(input.getPipeline()); > } > > > Best > Anand Singh Kunwar >