On Wed, Jul 10, 2024 at 8:49 PM Arun Vigesh <arunvigesh1...@gmail.com> wrote:
> Hi > > I am encountering a Singleton object not available error when trying to > add the parameter *spark.cassandra.sql.pushdown.additionalClasses* to > push all filters to Cassandra. Please find the code and error message below: > > *Code:* > package com.common.reader > > import org.apache.spark.sql.{DataFrame, SparkSession} > import com.datastax.spark.connector.cql.TableDef > import org.apache.spark.SparkConf > import org.apache.spark.sql.cassandra.{AnalyzedPredicates, > CassandraPredicateRules, CassandraSourceRelation} > > class CassandraReader(spark: SparkSession) { > def read(table: String, keyspace: String, path: String, inc_field: > String): DataFrame = > { > var df = spark > .read > .format("org.apache.spark.sql.cassandra") > .option( "table",table) > .option( "keyspace",keyspace) > > > .option("spark.cassandra.sql.pushdown.additionalClasses","com.common.reader.PushdownEverything") > // .options(Map( > // "keyspace" -> keyspace, > // "table" -> table, > // "pushdown" -> "true", > // > CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name -> > "com.common.reader.PushdownEverything")) > .load > if (inc_field.nonEmpty) > { > val max_inc_value= spark.sql(s"select max(${inc_field}) from > delta.`${path}`").first.get(0) > println(max_inc_value) > df = df.filter(s"${inc_field}>'${max_inc_value}'") > } > df > } > } > object PushdownEverything extends CassandraPredicateRules { > override def apply( > predicates: AnalyzedPredicates, > tableDef: TableDef, > sparkConf: SparkConf): AnalyzedPredicates = { > AnalyzedPredicates(predicates.handledByCassandra ++ > predicates.handledBySpark, Set.empty) > } > } > > *Error:* > > IllegalArgumentException: Singleton object not available: > com.common.reader.PushdownEverything > Caused by: ClassNotFoundException: com.common.reader.PushdownEverything > at > com.datastax.spark.connector.util.ReflectionUtil$.findGlobalObject(ReflectionUtil.scala:54) > at > com.datastax.spark.connector.datasource.CassandraScanBuilder.$anonfun$additionalRules$1(CassandraScanBuilder.scala:102) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) > at > com.datastax.spark.connector.datasource.CassandraScanBuilder.additionalRules(CassandraScanBuilder.scala:102) > at > com.datastax.spark.connector.datasource.CassandraScanBuilder.pushFilters(CassandraScanBuilder.scala:76) > at > org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:66) > at > org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:73) > at > org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:60) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465) > at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org > $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339) > at > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:470) > at > org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1267) > at > org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1266) > at > org.apache.spark.sql.execution.datasources.WriteFiles.mapChildren(WriteFiles.scala:58) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39) > .... >