On Wed, Jul 10, 2024 at 8:49 PM Arun Vigesh <arunvigesh1...@gmail.com>
wrote:

> Hi
>
> I am encountering a Singleton object not available error when trying to
> add the parameter *spark.cassandra.sql.pushdown.additionalClasses* to
> push all filters to Cassandra. Please find the code and error message below:
>
> *Code:*
> package com.common.reader
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import com.datastax.spark.connector.cql.TableDef
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.cassandra.{AnalyzedPredicates,
> CassandraPredicateRules, CassandraSourceRelation}
>
> class CassandraReader(spark: SparkSession) {
>   def read(table: String, keyspace: String, path: String, inc_field:
> String): DataFrame =
>   {
>     var df = spark
>       .read
>       .format("org.apache.spark.sql.cassandra")
>       .option( "table",table)
>       .option( "keyspace",keyspace)
>
>  
> .option("spark.cassandra.sql.pushdown.additionalClasses","com.common.reader.PushdownEverything")
> //      .options(Map(
> //        "keyspace" -> keyspace,
> //        "table" -> table,
> //        "pushdown" -> "true",
> //
> CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name ->
> "com.common.reader.PushdownEverything"))
>       .load
>     if (inc_field.nonEmpty)
>     {
>       val max_inc_value= spark.sql(s"select max(${inc_field}) from
> delta.`${path}`").first.get(0)
>       println(max_inc_value)
>       df = df.filter(s"${inc_field}>'${max_inc_value}'")
>     }
>     df
>   }
> }
> object PushdownEverything extends CassandraPredicateRules {
>   override def apply(
>                       predicates: AnalyzedPredicates,
>                       tableDef: TableDef,
>                       sparkConf: SparkConf): AnalyzedPredicates = {
>     AnalyzedPredicates(predicates.handledByCassandra ++
> predicates.handledBySpark, Set.empty)
>   }
> }
>
> *Error:*
>
> IllegalArgumentException: Singleton object not available:
> com.common.reader.PushdownEverything
> Caused by: ClassNotFoundException: com.common.reader.PushdownEverything
> at
> com.datastax.spark.connector.util.ReflectionUtil$.findGlobalObject(ReflectionUtil.scala:54)
> at
> com.datastax.spark.connector.datasource.CassandraScanBuilder.$anonfun$additionalRules$1(CassandraScanBuilder.scala:102)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
> at
> com.datastax.spark.connector.datasource.CassandraScanBuilder.additionalRules(CassandraScanBuilder.scala:102)
> at
> com.datastax.spark.connector.datasource.CassandraScanBuilder.pushFilters(CassandraScanBuilder.scala:76)
> at
> org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:66)
> at
> org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:73)
> at
> org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:60)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
> at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
> $apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
> at
> org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:470)
> at
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1267)
> at
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1266)
> at
> org.apache.spark.sql.execution.datasources.WriteFiles.mapChildren(WriteFiles.scala:58)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
> at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
> ....
>

Reply via email to