Hi
I am encountering a *Singleton object not available *error when trying to
add the parameter *spark.cassandra.sql.pushdown.additionalClasses* to push
all filters to Cassandra. Please find the code and error message below:
*Code:*
package com.common.reader
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.datastax.spark.connector.cql.TableDef
import org.apache.spark.SparkConf
import org.apache.spark.sql.cassandra.{AnalyzedPredicates,
CassandraPredicateRules, CassandraSourceRelation}
class CassandraReader(spark: SparkSession) {
def read(table: String, keyspace: String, path: String, inc_field:
String): DataFrame =
{
var df = spark
.read
.format("org.apache.spark.sql.cassandra")
.option( "table",table)
.option( "keyspace",keyspace)
.option("spark.cassandra.sql.pushdown.additionalClasses","com.common.reader.PushdownEverything")
// .options(Map(
// "keyspace" -> keyspace,
// "table" -> table,
// "pushdown" -> "true",
// CassandraSourceRelation.AdditionalCassandraPushDownRulesParam.name
-> "com.common.reader.PushdownEverything"))
.load
if (inc_field.nonEmpty)
{
val max_inc_value= spark.sql(s"select max(${inc_field}) from
delta.`${path}`").first.get(0)
println(max_inc_value)
df = df.filter(s"${inc_field}>'${max_inc_value}'")
}
df
}
}
object PushdownEverything extends CassandraPredicateRules {
override def apply(
predicates: AnalyzedPredicates,
tableDef: TableDef,
sparkConf: SparkConf): AnalyzedPredicates = {
AnalyzedPredicates(predicates.handledByCassandra ++
predicates.handledBySpark, Set.empty)
}
}
*Error:*
IllegalArgumentException: Singleton object not available:
com.common.reader.PushdownEverything
Caused by: ClassNotFoundException: com.common.reader.PushdownEverything
at
com.datastax.spark.connector.util.ReflectionUtil$.findGlobalObject(ReflectionUtil.scala:54)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.$anonfun$additionalRules$1(CassandraScanBuilder.scala:102)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.additionalRules(CassandraScanBuilder.scala:102)
at
com.datastax.spark.connector.datasource.CassandraScanBuilder.pushFilters(CassandraScanBuilder.scala:76)
at
org.apache.spark.sql.execution.datasources.v2.PushDownUtils$.pushFilters(PushDownUtils.scala:66)
at
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:73)
at
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$pushDownFilters$1.applyOrElse(V2ScanRelationPushDown.scala:60)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:465)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:69)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:465)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org
$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:339)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:335)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:39)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:470)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1267)
at
org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1266)
at
org.apache.spark.sql.execution.datasources.WriteFiles.mapChildren(WriteFiles.scala:58)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:470)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:39)
....
--
**The content of this e-mail is confidential and is intended solely for the
use of the individual or entity to whom it is addressed. If you have
received this e-mail by mistake, please reply to this e-mail and follow
with its deletion. If you are not the intended recipient, please note that
it shall be considered unlawful to copy, forward or in any manner reveal
the contents of this e-mail or any part thereof to anyone. Although
Freshworks has taken reasonable precautions to ensure no malware is present
in this e-mail, Freshworks cannot accept responsibility for any loss or
damage arising from the use of this e-mail or attachments.**