Re: Best way to emit custom metrics to Prometheus in spark structured streaming
So I tried it again in standalone mode (spark-shell) and the df.observe() functionality works. I tried sum, count, conditional aggregations using 'when', etc and all of this works in spark-shell. But, with spark-on-k8s, cluster mode, only using lit() as the aggregation column works. No other aggregation, including, count, sum, etc work. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Best way to emit custom metrics to Prometheus in spark structured streaming
Hi, Thanks for the reply. I tried it out today but I am unable to get it to work in cluster mode. The aggregation result is always 0. It works fine in standalone however with spark shell but with spark on Kubernetes in cluster mode, it doesn't. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Best way to emit custom metrics to Prometheus in spark structured streaming
Hi I am looking for the right approach to emit custom metrics for spark structured streaming job.*Actual Scenario:* I have an aggregated dataframe let's say with (id, key, value) columns. One of the kpis could be 'droppedRecords' and the corresponding value column has the number of dropped records. I need to filter all the KPIs with 'droppedRecords' and compute the sum on it's value column. *Challenges:* 1) Need to use only one streaming query so the metrics will be accurate (1 readStream and 1 writeStream). If the metrics are emitted in a separate query, then it can cause inconsistencies due to varying watermark time between the query that does the aggregation and the one that gets only the metrics. *I evaluated some of the approaches:* 1) _foreachBatch sink:_ This works for emitting metrics but there are other bugs.. Eg: The numOutputRows emitted in logs is always -1. 2) _Using accumulators:_ val dropCounts: LongAccumulator = new LongAccumulator spark.sparkContext.register(dropCounts, "Drop Counts Accumulator") df.as[].map(row => { val value = row.value dropCounts.add(value.toLong) }) This approach seems to have a bug in spark. The executor does add the value correctly but the driver's count is always 0. 3) _Using mapGroupsWithState._ This requires an action on the aggregated dataframe to retrieve metrics, therefore creates another streaming query. I am using spark 3.0.1. What's would be the best way to implement custom metrics? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/