Exporting all Executor Metrics in Prometheus format in K8s cluster

2021-02-04 Thread Dávid Szakállas
I’ve been trying to set up monitoring for our Spark 3.0.1 cluster running in 
K8s. We are using Prometheus as our monitoring system. We require both executor 
and driver metrics. My initial approach was to use the following configuration, 
to expose both  metrics on the Spark UI:

{
'spark.ui.prometheus.enabled': ‘true’
}

I was able to scrape http://:4040/metrics/prometheus/ for 
driver and http://:4040/metrics/executors/prometheus/ for 
executor metrics. However, the executor metrics only contain those defined 
here: https://spark.apache.org/docs/latest/monitoring.html#executor-metrics 
, which 
is referred to as ExecutorSummary. However, I would like to get all metrics 
from the Executor instance metric system: 
https://spark.apache.org/docs/latest/monitoring.html#component-instance--executor
 
.

I am not sure if these are available on the driver at all, so I’ve been 
thinking of directly scraping the executors instead. It seems PrometheusServlet 
is meant for this purpose, however the executors aren't running web servers. I 
also don’t seem to find a configuration setting to open up a port on the 
executor container, so that it can be scraped. So the thing I have in my mind 
right now is writing a custom sink that exports the metrics in the Prometheus 
format to a local file, and running a sidecar container with a nginx that 
serves that static file. In turn the nginx endpoint can be scraped by 
Prometheus. Am I overcomplicating this? Is there a simpler approach?

Thanks,
David Szakallas


signature.asc
Description: Message signed with OpenPGP


Custom Metrics Source -> Sink routing

2020-09-30 Thread Dávid Szakállas
Is there a way to customize what metrics sources are routed to what sinks? 

If I understood the docs  
correctly, there are some global switches for enabling sources, e.g. 
spark.metrics.staticSources.enabled, 
spark.metrics.executorMetricsSource.enabled.

We would like to specify Source -> Sink routing on a namespace basis. The use 
case is the following: we would like to have Prometheus monitoring for our 
Spark jobs. A large majority of metrics are to be exposed using the 
experimental Prometheus endpoint for direct scraping. However, we would like to 
expose a select set of metrics through a push gateway, as we want to guarantee 
that these metrics are scraped. For example a counter for the number of rows 
written to each inserted table, etc. These are reported mostly at the end of a 
batch ingestion job, so a push model is a better fit. We created a dedicated 
DropWizard MetricsRegistry for these custom metrics and are using 
https://github.com/banzaicloud/spark-metrics 
 for pushing the metrics to the 
PGW. However pushing all the metrics to the gateway overloads it, and is 
unnecessary to be duplicated there. 

Ideally there should be a way to route batch-like metrics to this sink and 
having the rest of the gauges exposed through the normal prometheus sink.

Is this something that could be solved with configuration currently, or 
requires custom code on the plugin side?

Thanks,
David Szakallas





Secrets in Spark apps

2020-07-27 Thread Dávid Szakállas
Hi folks,

Do you know what’s the best method to passing secrets to Spark operations, for 
e.g doing encryption, salting with a secret before hashing etc.?
I have multiple ideas on top of my head

The secret's source:
- environment variable
- config property
- remote service accessed through an API.

Passing to the executors:
1. Driver resolves the secret
   a. it passes it to the encryption function as an argument, which ends up as 
an argument to a UDF/gets interpolated in the expression’s generated code.
   b. it passes it to the encryption function as a literal expression. For 
security, I can create a SecretLiteral expression that redacts it from the 
pretty printed and SQL versions. Are there any other concerns here?

2. Executors resolves the secret
   a. e.g. reads it from an env/config/service, only the env var name/property 
name/path/URI is passed as part of the plan. I need to cache the secret on the 
executor to prevent a performance hit especially in the remote service case.
   b. Similarly to (1.b), I can create an expression that resolves the secret 
during execution.

In (1) the secret will be passed as part of the plan, so the RPC connections 
have to be encrypted if the attacker can sniff on the network for secrets. 1.b 
and 2.b is superior for composing with existing expressions, e.g 
`sha1(concat(colToMask, secretLit(“mySecret")))` for masking a column 
deterministically using a cryptographic hash function and a secret salt. (2) 
might involve a more complicated design than (1).

If you can point me to existing work in this space it would be a great help!

Thanks in advance,
David Szakallas






Dataset schema incompatibility bug when reading column partitioned data

2019-03-29 Thread Dávid Szakállas
We observed the following bug on Spark 2.4.0:

scala> 
spark.createDataset(Seq((1,2))).write.partitionBy("_1").parquet("foo.parquet")

scala> val schema = StructType(Seq(StructField("_1", 
IntegerType),StructField("_2", IntegerType)))

scala> spark.read.schema(schema).parquet("foo.parquet").as[(Int, Int)].show
+---+---+
| _2| _1|
+---+---+
|  2|  1|
+---+- --+

That is, when reading column partitioned Parquet files the explicitly specified 
schema is not adhered to, instead the partitioning columns are appended the end 
of the column list. This is a quite severe issue as some operations, such as 
union, fails if columns are in a different order in two datasets. Thus we have 
to work around the issue with a select:

val columnNames = schema.fields.map(_.name)
ds.select(columnNames.head, columnNames.tail: _*)


Thanks, 
David Szakallas
Data Engineer | Whitepages, Inc.

Support nested keys in DataFrameWriter.bucketBy

2018-10-15 Thread Dávid Szakállas
Currently (In Spark 2.3.1) we cannot bucket DataFrames by nested columns, e.g 

df.write.bucketBy(10, "key.a").saveAsTable(“junk”)

will result in the following exception:

org.apache.spark.sql.AnalysisException: bucket column key.a is not defined in 
table junk, defined table columns are: key, value;
at 
org.apache.spark.sql.catalyst.catalog.CatalogUtils$$anonfun$org$apache$spark$sql$catalyst$catalog$CatalogUtils$$normalizeColumnName$2.apply(ExternalCatalogUtils.scala:246)
at 
org.apache.spark.sql.catalyst.catalog.CatalogUtils$$anonfun$org$apache$spark$sql$catalyst$catalog$CatalogUtils$$normalizeColumnName$2.apply(ExternalCatalogUtils.scala:246)
at scala.Option.getOrElse(Option.scala:121)
…

Are there plans to change this anytime soon?

Thanks, David





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



updateStateByKey for window batching

2016-08-22 Thread Dávid Szakállas
Hi!

I’m curious about the fault-tolerance properties of stateful streaming 
operations. I am specifically interested about updateStateByKey.
What happens if a node fails during processing? Is the state recoverable?

Our use case is the following: we have messages arriving from a message queue 
about updating a resource specified in the message. When such update request 
arrives, we wait a specific amount of times and if in that window another 
update message arrives pointing in the same resource, we batch these, and 
update the after the time elapsed since the first in this window and update the 
resource. We thought about using updateStateByKey with key as the resource 
identifier.

It is important to guarantee exactly once processing for the messages so every 
update should happen, and no more than once.

Is it a good way to go?

Cheers,

--
David Szakallas | Software Engineer, RisingStack
Monitoring with Trace: http://trace.risingstack.com 

http://risingstack.com  | http://blog.risingstack.com 

Twitter: @szdavid92







signature.asc
Description: Message signed with OpenPGP using GPGMail