Hi all,

Sorry if this isn't the right place to ask basic questions, but I'm at the end 
of my rope here - please let me know where else I can get help if this isn't 
the right place.

I'm trying to continuously read from a Kafka topic and send the number of rows 
Spark has received to a metric tracking service. I'm expecting an unbounded 
stream of input data, so I need to send the number of rows periodically and 
will sum them within the metric tracking service. I thought counting per 
Dataframe or over non-overlapping periods of time would make sense, but I 
haven't had luck with either.

Whatever approach I take, I inevitably need to call count() which triggers 
Spark to execute the DAG and terminate the application (presumably because the 
count() action has been completed). What I really need is for my Spark 
application to receive data indefinitely, count the rows periodically, and send 
the count(s) to the metric tracker.

My current program looks something like this:

val df = spark
.readStream
.format("kafka")
.<other options>
.load()
.select($"partition", $"timestamp", $"offset", $"value" cast "string")

val metricTracker = new metricTracker()

//Track the metric, second parameter needs to be type Long
metricTracker.track("numberOfRows", df.count())

//Output data to console
val query = df
    .writeStream
    .outputMode("append")
    .format("console")
    .trigger(Trigger.Continuous("5 seconds"))
    .start()

If I remove the metricTracker lines, it receives data indefinitely and prints 
it to console. When I add the highlighted call to df.count(), it executes and 
terminates the program very quickly. Any ideas on how I can send the number of 
rows Spark is receiving/processing from a stream with no end?

Thanks,
Basil

Reply via email to