Re: Best way to emit custom metrics to Prometheus in spark structured streaming

2020-11-04 Thread meetwes
So I tried it again in standalone mode (spark-shell) and the df.observe()
functionality works. I tried sum, count, conditional aggregations using
'when', etc and all of this works in spark-shell. But, with spark-on-k8s,
cluster mode, only using lit() as the aggregation column works. No other
aggregation, including, count, sum, etc work.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Best way to emit custom metrics to Prometheus in spark structured streaming

2020-11-04 Thread meetwes
Hi, Thanks for the reply. I tried it out today but I am unable to get it to
work in cluster mode. The aggregation result is always 0. It works fine in
standalone however with spark shell but with spark on Kubernetes in cluster
mode, it doesn't.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way to emit custom metrics to Prometheus in spark structured streaming

2020-11-02 Thread meetwes
Hi I am looking for the right approach to emit custom metrics for spark
structured streaming job.*Actual Scenario:*
I have an aggregated dataframe let's say with (id, key, value) columns. One
of the kpis could be 'droppedRecords' and the corresponding value column has
the number of dropped records. I need to filter all the KPIs with
'droppedRecords' and compute the sum on it's value column.

*Challenges:*
1) Need to use only one streaming query so the metrics will be accurate (1
readStream and 1 writeStream). If the metrics are emitted in a separate
query, then it can cause inconsistencies due to varying watermark time
between the query that does the aggregation and the one that gets only the
metrics.

*I evaluated some of the approaches:*
1) _foreachBatch sink:_ This works for emitting metrics but there are other
bugs.. Eg: The numOutputRows emitted in logs is always -1.

2) _Using accumulators:_
val dropCounts: LongAccumulator = new LongAccumulator
spark.sparkContext.register(dropCounts, "Drop Counts Accumulator")
df.as[].map(row => {
val value = row.value
dropCounts.add(value.toLong)
})
This approach seems to have a bug in spark. The executor does add the value
correctly but the driver's count is always 0.

3) _Using mapGroupsWithState._ This requires an action on the aggregated
dataframe to retrieve metrics, therefore creates another streaming query.

I am using spark 3.0.1. What's would be the best way to implement custom
metrics?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/