Hi all, I am developing a spark application where I am loading the data into Cassandra and I am using the Spark Cassandra connector for the same. I have created a FAT jar with all the dependencies and submitted that using spark-submit.
I am able to load the data successfully to cassandra, but I am not able to get the metrics from the spark cassandra connector. I checked the executor logs and saw that the following properties failed to initialize because of the mentioned error. Properties: "spark.metrics.conf.driver.source.cassandra-connector.class": "org.apache.spark.metrics.CassandraConnectorSource" "spark.metrics.conf.executor.source.cassandra-connector.class": "org.apache.spark.metrics.CassandraConnectorSource" Error: 22/01/28 15:30:55 ERROR MetricsSystem: Source class org.apache.spark.metrics.CassandraConnectorSource cannot be instantiated java.lang.ClassNotFoundException: org.apache.spark.metrics.CassandraConnectorSource at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.Utils$.classForName(Utils.scala:235) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:182) at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSources$1.apply(MetricsSystem.scala:179) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:99) at org.apache.spark.metrics.MetricsSystem.registerSources(MetricsSystem.scala:179) at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:101) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:364) at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:200) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:228) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) I am loading the data to cassandra using the below code: cassandraTableDataset.toDF(cassandraTable.getRenamedColumns()). write().format(sparkCassandraFormat). options(ImmutableMap.of(cassandraKeyspaceString, getKeyspace(config), cassandraTableString, getTable(config))). mode(SaveMode.Append). save(); I cannot copy the spark cassandra connector jar to all the nodes in the cluster because of some restrictions. *Solutions tried:* *Solution 1: * Used spark.jars and spark.executor.extraClassPath options, but it did not work. As the executor's spark session is getting created before these jars or FAT application jar is fetched/copied to the executor node. *Solution 2:* I tried to manually initialize the org.apache.spark.metrics.CassandraConnectorSource class and registered with SparkEnv Metric system just before the cassandra loading, and again it did not work. I am assuming these changes are happening on the driver only. *Solution 3:* I also tried to set the same Properties using sparkEnvironment .getSparkSession().conf().set(), and it did not work as well. I do not know if the above mentioned properties can be added at runtime or not, so I tried this as well. I was hoping that this would help me change the executor config at runtime. Spark Version: 2.3.0 Scala Version: 2.11 Spark Cassandra Connector: com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 Please help with this issue, as these metrics are important. Thanks in advance. Thank You, Yogesh