Hey Spark Experts,

After listening to some of you, and the presentations at Spark Summit in
SF, I am transitioning from d-streams to structured streaming however I am
seeing some weird results.

My use case is as follows: I am reading in a stream from a kafka topic,
transforming a message, and writing the transformed message to another
kafka topic.

While running my stream, I can see the transformed messages on the output
topic so I know the basic structure of my stream seems to be running as
intended.

Inside my transformation, I am logging the total transform time as well as
the raw message being transformed. (Java by the way)

The 2 weird things I am seeing:
1) I am seeing that the consumer lag for this particular consumer group on
the input topic is increasing. This does not make sense to me - looking at
the transform time from the logs, it should easily be able to handle the
incoming feed. To give an example the transform times are < 10 ms per
record and the sample of data does not contain > 100 messages per second.
The stream should be reducing consumer lag as it runs (especially
considering multiple workers and partitions)

2) I am seeing the same log transformation messages over and over on the
dataproc spark cluster logs. For example, I am currently looking at my logs
and the last 20+ log messages are the exact same

I thought 2 may be due to offsets not being handled correctly, but I am
seeing a reasonable range of transformed messages on the target topic, and
I'm using the built in checkpointing for spark to handle the offsets for me.

In terms of 1, why would I be seeing the same log messages over and over?
It doesnt make sense to me - wouldnt the message only be transformed once
and it's offset committed?

If anything stands out as incorrect, or something you've seen please let me
know - this is rather new to me and my code seems to be following the same
as other examples I see across the net

Here's a redacted snippet of my stream:

spark.readStream().format("kafka").option("kafka.bootstrap.servers",
"XXXXX")
        .option("kafka.partition.assignment.strategy",
RoundRobinAssignor.class.getName())
        .option("subscribe", ""XXXX"")
        .option("startingOffsets", "earliest")
        .load()
        .select("value")
        .as(Encoders.STRING())
        .map((MapFunction<String, String>) value -> transform(value),
Encoders.STRING())
        .writeStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "XXXXX")
        .option("topic", ""XXXXX"")
        .outputMode("append")
        .option("checkpointLocation", "/checkpoints/testCheckpoint")
        .start()
        .awaitTermination();

Thanks!
Austin

Reply via email to