Hi Austin,

A few questions:

   1. What is the partition of the kafka topic that used for input and
   output data?
   2. In the write stream, I will recommend to use "trigger" with a defined
   interval, if you prefer micro-batching strategy,
   3. along with defining "maxOffsetsPerTrigger" in kafka readStream
   options, which lets you choose the amount of messages you want per trigger.
   (Helps in maintaining the expected threshold of executors/memory for the
   cluster)

For repeated log messages, notice in your logs the streaming query progress
published. This progress status displays a lot of metrics that shall be
your first diagnosis to identify issues.
The progress status with kafka stream displays the "startOffset" and
"endOffset" values per batch. This is listed topic-partition wise the start
to end offsets per trigger batch of streaming query.


Akshay Bhardwaj
+91-97111-33849


On Tue, May 7, 2019 at 8:02 PM Austin Weaver <aus...@flyrlabs.com> wrote:

> Hey Spark Experts,
>
> After listening to some of you, and the presentations at Spark Summit in
> SF, I am transitioning from d-streams to structured streaming however I am
> seeing some weird results.
>
> My use case is as follows: I am reading in a stream from a kafka topic,
> transforming a message, and writing the transformed message to another
> kafka topic.
>
> While running my stream, I can see the transformed messages on the output
> topic so I know the basic structure of my stream seems to be running as
> intended.
>
> Inside my transformation, I am logging the total transform time as well as
> the raw message being transformed. (Java by the way)
>
> The 2 weird things I am seeing:
> 1) I am seeing that the consumer lag for this particular consumer group on
> the input topic is increasing. This does not make sense to me - looking at
> the transform time from the logs, it should easily be able to handle the
> incoming feed. To give an example the transform times are < 10 ms per
> record and the sample of data does not contain > 100 messages per second.
> The stream should be reducing consumer lag as it runs (especially
> considering multiple workers and partitions)
>
> 2) I am seeing the same log transformation messages over and over on the
> dataproc spark cluster logs. For example, I am currently looking at my logs
> and the last 20+ log messages are the exact same
>
> I thought 2 may be due to offsets not being handled correctly, but I am
> seeing a reasonable range of transformed messages on the target topic, and
> I'm using the built in checkpointing for spark to handle the offsets for me.
>
> In terms of 1, why would I be seeing the same log messages over and over?
> It doesnt make sense to me - wouldnt the message only be transformed once
> and it's offset committed?
>
> If anything stands out as incorrect, or something you've seen please let
> me know - this is rather new to me and my code seems to be following the
> same as other examples I see across the net
>
> Here's a redacted snippet of my stream:
>
> spark.readStream().format("kafka").option("kafka.bootstrap.servers",
> "XXXXX")
>         .option("kafka.partition.assignment.strategy",
> RoundRobinAssignor.class.getName())
>         .option("subscribe", ""XXXX"")
>         .option("startingOffsets", "earliest")
>         .load()
>         .select("value")
>         .as(Encoders.STRING())
>         .map((MapFunction<String, String>) value -> transform(value),
> Encoders.STRING())
>         .writeStream()
>         .format("kafka")
>         .option("kafka.bootstrap.servers", "XXXXX")
>         .option("topic", ""XXXXX"")
>         .outputMode("append")
>         .option("checkpointLocation", "/checkpoints/testCheckpoint")
>         .start()
>         .awaitTermination();
>
> Thanks!
> Austin
>

Reply via email to