Hi All,

I am trying to use a function within spark sql which accepts 2 - 4
arguments. I was able to get through compilation errors however, I see the
attached runtime exception when trying from Spark SQL.
(refer attachment for the complete stacktrace- StackTraceFor_runTestInSQL)

The function itself works well when tried as a regular function.

Here is how I am trying it:
************************************************************************************************************************************************
//Also tried without the defaulted  fmt parameters in the definition. The
issue persists.

 def within10yrs(FromDT: String, ToDT: String, fmt1:
DateTimeFormatter=dateFormats.YYYYMMDDHHMISS,fmt2:
DateTimeFormatter=dateFormats.YYYYMMDD):Boolean={
  println("Compute Within 10 years only if date2 is greater than date1")
     val yrsBetn = Years.yearsBetween(toDateTime(FromDT,fmt1),
toDateTime(ToDT,fmt2)).getYears
 val in10 =yrsBetn match {
                 case x if(x > 0 && x <= 10) => true
                 case _ => false
      }

  println("FromDT =" + FromDT + "ToDT =" + ToDT + "within10years =" + in10
+ " actual number of years is " + yrsBetn)
  in10
}

************************************************************************************************************************************************
def runTestInSQL(sc:SparkContext): Unit  = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    sqlContext.setConf("spark.sql.parquet.binaryAsString", "true")
    import sqlContext._
//Tried both the below variants to register the function:
   //     sqlContext.registerFunction("within10yrs",
(date1:String,date2:String,
fmt1:DateTimeFormatter,fmt2:DateTimeFormatter)=>within10yrs(date1,date2,fmt1,fmt2))
          sqlContext.registerFunction("within10yrs",
(date1:String,date2:String)=>within10yrs(date1,date2))

  val query1 = """ select a.col1,a.col2,a.col3,b.col4,b.col5
                                  FROM a JOIN b on a.col1 = b.col1
                      WHERE within10yrs(b.col4,a.col3)
GROUP  BY a.col1,
          a.col2,
          a.col3,
          b.col4,
          b.col5
"""
// The table b in the query above is a nested query actually and the sql
works well without the "WHERE within10yrs(b.col4,a.col3)". Hence skipping
the details to keep the problem
description simple.
 val res1 = sqlContext.sql(query1)
res1.count() // Line number 70 as in the stack trace
}

Execution throws runtime exception:
----------------------------------------------------
//Stack trace available in the file named - StackTraceFor_runTestInSQL
************************************************************************************************************************************************
def testasStdAlone()={
      val date1 = "2005-07-18 00:00:00"
      val fmt1 =  dateFormats.YYYYMMDDHHMISS
      val date2 = "20150719"
      val fmt2 =  dateFormats.YYYYMMDD
      println("date1 is " + date1 + " format is " +
dateFormats.YYYYMMDDHHMISS )
      println("date2 is " + date2 + " format is " + dateFormats.YYYYMMDD )
      //println("Within 10 years is " + within10yrs(date1,fmt1,date2,fmt2))
      println("Within 10 years is " + within10yrs(date1,date2))
    }

   def main(x: Array[String]): Unit = {
        val returned = testasStdAlone()
          //testRollup()
        println(returned)
    }
Output as expected:
----------------------------
date1 is 2005-07-18 00:00:00 format is
org.joda.time.format.DateTimeFormatter@28d101f3
date2 is 20150719 format is org.joda.time.format.DateTimeFormatter@5e411af2
Within 10 years
FromDT =2005-07-18 00:00:00ToDT =20150719within10years =true actual number
of years is 10
Within 10 years is true
()


Appreciate any direction from the community.

regards
Sunita
Exception in thread "main" scala.reflect.internal.MissingRequirementError: 
class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with 
primordial classloader with boot classpath 
[C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-library.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-swing.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\294\1\.cp\lib\scala-actors.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\293\1\.cp\lib\scala-reflect.jar;C:\Sunita\eclipse\configuration\org.eclipse.osgi\bundles\293\1\.cp\lib\scala-compiler.jar;C:\Java\jdk1.7.0_71\jre\lib\resources.jar;C:\Java\jdk1.7.0_71\jre\lib\rt.jar;C:\Java\jdk1.7.0_71\jre\lib\sunrsasign.jar;C:\Java\jdk1.7.0_71\jre\lib\jsse.jar;C:\Java\jdk1.7.0_71\jre\lib\jce.jar;C:\Java\jdk1.7.0_71\jre\lib\charsets.jar;C:\Java\jdk1.7.0_71\jre\lib\jfr.jar;C:\Java\jdk1.7.0_71\jre\classes]
 not found.
        at 
scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
        at 
scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
        at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
        at 
scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
        at 
scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
        at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
        at 
scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115)
        at 
scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
        at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
        at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335)
        at scala.reflect.api.Universe.typeOf(Universe.scala:59)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100)
        at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
        at 
org.apache.spark.sql.UDFRegistration$class.builder$3(UdfRegistration.scala:96)
        at 
org.apache.spark.sql.UDFRegistration$$anonfun$registerFunction$2.apply(UdfRegistration.scala:97)
        at 
org.apache.spark.sql.UDFRegistration$$anonfun$registerFunction$2.apply(UdfRegistration.scala:97)
        at 
org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:53)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:220)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10$$anonfun$applyOrElse$2.applyOrElse(Analyzer.scala:218)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:71)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:81)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:89)
        at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:60)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10.applyOrElse(Analyzer.scala:218)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$10.applyOrElse(Analyzer.scala:216)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:216)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:215)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
        at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
        at scala.collection.immutable.List.foldLeft(List.scala:84)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
        at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
        at org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)
        at org.apache.spark.sql.SchemaRDD.count(SchemaRDD.scala:364)
        at com.toyota.customer360.croe.common.runTestInSQL(Trial.scala:70)
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to