Hi

I am encountering a *Singleton object not available *error when trying to
add the parameter *spark.cassandra.sql.pushdown.additionalClasses* to push
all filters to Cassandra. Please find the code and error message below:

*Code:*

package com.common.reader

import org.apache.spark.sql.{DataFrame, SparkSession}
import com.datastax.spark.connector.cql.TableDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.{AnalyzedPredicates,
CassandraPredicateRules, CassandraSourceRelation}

class CassandraReader(spark: SparkSession) {
  def read(table: String, keyspace: String, path: String, inc_field:
String): DataFrame =
  {
    var df = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .option( "table",table)
      .option( "keyspace",keyspace)
 
.option("spark.cassandra.sql.pushdown.additionalClasses","com.common.reader.PushdownEverything")
//      .options(Map(
//        "keyspace" -> keyspace,
//        "table" -> table,
//        "pushdown" -> "true",
//        CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name
-> "com.common.reader.PushdownEverything"))
      .load
    if (inc_field.nonEmpty)
    {
      val max_inc_value= spark.sql(s"select max(${inc_field}) from
delta.`${path}`").first.get(0)
      println(max_inc_value)
      df = df.filter(s"${inc_field}>'${max_inc_value}'")
    }
    df
  }
}
object PushdownEverything extends CassandraPredicateRules {
  override def apply(
                      predicates: AnalyzedPredicates,
                      tableDef: TableDef,
                      sparkConf: SparkConf): AnalyzedPredicates = {
    AnalyzedPredicates(predicates.handledByCassandra ++
predicates.handledBySpark, Set.empty)
  }
}

*Error:*

IllegalArgumentException: Singleton object not available:
com.common.reader.PushdownEverything
Caused by: ClassNotFoundException: com.common.reader.PushdownEverything
at
com.datastax.spark.connector.util.ReflectionUtil$.findGlobalObject(ReflectionUtil.scala:54)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.$anonfun$additionalRules$1(CassandraScanBuilder.scala:102)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.additionalRules(CassandraScanBuilder.scala:102)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.pushFilters(CassandraScanBuilder.scala:76)
at
org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:66)
at
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:73)
at
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:60)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:470)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1267)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1266)
at
org.apache.spark.sql.execution.datasources.WriteFiles.mapChildren(WriteFiles.scala:58)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
....

-- 
**The content of this e-mail is confidential and is intended solely for the 
use of the individual or entity to whom it is addressed. If you have 
received this e-mail by mistake, please reply to this e-mail and follow 
with its deletion. If you are not the intended recipient, please note that 
it shall be considered unlawful to copy, forward or in any manner reveal 
the contents of this e-mail or any part thereof to anyone. Although 
Freshworks has taken reasonable precautions to ensure no malware is present 
in this e-mail, Freshworks cannot accept responsibility for any loss or 
damage arising from the use of this e-mail or attachments.**

Reply via email to