Hi all,
Sorry if this isn't the right place to ask basic questions, but I'm at the end
of my rope here - please let me know where else I can get help if this isn't
the right place.
I'm trying to continuously read from a Kafka topic and send the number of rows
Spark has received to a metric tracking service. I'm expecting an unbounded
stream of input data, so I need to send the number of rows periodically and
will sum them within the metric tracking service. I thought counting per
Dataframe or over non-overlapping periods of time would make sense, but I
haven't had luck with either.
Whatever approach I take, I inevitably need to call count() which triggers
Spark to execute the DAG and terminate the application (presumably because the
count() action has been completed). What I really need is for my Spark
application to receive data indefinitely, count the rows periodically, and send
the count(s) to the metric tracker.
My current program looks something like this:
val df = spark
.readStream
.format("kafka")
.<other options>
.load()
.select($"partition", $"timestamp", $"offset", $"value" cast "string")
val metricTracker = new metricTracker()
//Track the metric, second parameter needs to be type Long
metricTracker.track("numberOfRows", df.count())
//Output data to console
val query = df
.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.Continuous("5 seconds"))
.start()
If I remove the metricTracker lines, it receives data indefinitely and prints
it to console. When I add the highlighted call to df.count(), it executes and
terminates the program very quickly. Any ideas on how I can send the number of
rows Spark is receiving/processing from a stream with no end?
Thanks,
Basil