Hey Spark Experts, After listening to some of you, and the presentations at Spark Summit in SF, I am transitioning from d-streams to structured streaming however I am seeing some weird results.
My use case is as follows: I am reading in a stream from a kafka topic, transforming a message, and writing the transformed message to another kafka topic. While running my stream, I can see the transformed messages on the output topic so I know the basic structure of my stream seems to be running as intended. Inside my transformation, I am logging the total transform time as well as the raw message being transformed. (Java by the way) The 2 weird things I am seeing: 1) I am seeing that the consumer lag for this particular consumer group on the input topic is increasing. This does not make sense to me - looking at the transform time from the logs, it should easily be able to handle the incoming feed. To give an example the transform times are < 10 ms per record and the sample of data does not contain > 100 messages per second. The stream should be reducing consumer lag as it runs (especially considering multiple workers and partitions) 2) I am seeing the same log transformation messages over and over on the dataproc spark cluster logs. For example, I am currently looking at my logs and the last 20+ log messages are the exact same I thought 2 may be due to offsets not being handled correctly, but I am seeing a reasonable range of transformed messages on the target topic, and I'm using the built in checkpointing for spark to handle the offsets for me. In terms of 1, why would I be seeing the same log messages over and over? It doesnt make sense to me - wouldnt the message only be transformed once and it's offset committed? If anything stands out as incorrect, or something you've seen please let me know - this is rather new to me and my code seems to be following the same as other examples I see across the net Here's a redacted snippet of my stream: spark.readStream().format("kafka").option("kafka.bootstrap.servers", "XXXXX") .option("kafka.partition.assignment.strategy", RoundRobinAssignor.class.getName()) .option("subscribe", ""XXXX"") .option("startingOffsets", "earliest") .load() .select("value") .as(Encoders.STRING()) .map((MapFunction<String, String>) value -> transform(value), Encoders.STRING()) .writeStream() .format("kafka") .option("kafka.bootstrap.servers", "XXXXX") .option("topic", ""XXXXX"") .outputMode("append") .option("checkpointLocation", "/checkpoints/testCheckpoint") .start() .awaitTermination(); Thanks! Austin