Filed https://issues.apache.org/jira/browse/SPARK-6708 to track this.

Cheng

On 4/4/15 10:21 PM, Cheng Lian wrote:
I think this is a bug of Spark SQL dates back to at least 1.1.0.

The json_tuple function is implemented as org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple. The ClassNotFoundException should complain with the class name rather than the UDTF function name.

The problematic line should be this one <https://github.com/apache/spark/blob/9b40c17ab161b64933539abeefde443cb4f98673/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala#L1288>. HiveFunctionWrapper expects the full qualified class name of the UDTF class that implements the function, but we pass in the function name.

Thanks for reporting this!

Cheng

On 4/2/15 3:19 AM, Todd Nist wrote:

I have a feeling I’m missing a Jar that provides the support or could this may be related to https://issues.apache.org/jira/browse/SPARK-5792. If it is a Jar where would I find that ? I would have thought in the $HIVE/lib folder, but not sure which jar contains it.

Error:

|Create  MetricTemporary  Table  for  querying
15/04/01  14:41:44  INFO HiveMetaStore:0: Opening raw storewith  implemenation 
class:org.apache.hadoop.hive.metastore.ObjectStore
15/04/01  14:41:44  INFO ObjectStore: ObjectStore, initialize called
15/04/01  14:41:45  INFO Persistence: Property 
hive.metastore.integral.jdo.pushdownunknown  - will be ignored
15/04/01  14:41:45  INFO Persistence: Property datanucleus.cache.level2unknown  
- will be ignored
15/04/01  14:41:45  INFO BlockManager: Removing broadcast0
15/04/01  14:41:45  INFO BlockManager: Removing block broadcast_0
15/04/01  14:41:45  INFO MemoryStore: Block broadcast_0of  size  1272  
droppedfrom  memory (free278018571)
15/04/01  14:41:45  INFO BlockManager: Removing block broadcast_0_piece0
15/04/01  14:41:45  INFO MemoryStore: Block broadcast_0_piece0of  size  869  
droppedfrom  memory (free278019440)
15/04/01  14:41:45  INFO BlockManagerInfo: Removed broadcast_0_piece0on  
192.168.1.5:63230  in  memory (size:869.0  B, free:265.1  MB)
15/04/01  14:41:45  INFO BlockManagerMaster: Updated infoof  block 
broadcast_0_piece0
15/04/01  14:41:45  INFO BlockManagerInfo: Removed broadcast_0_piece0on  
192.168.1.5:63278  in  memory (size:869.0  B, free:530.0  MB)
15/04/01  14:41:45  INFO ContextCleaner: Cleaned broadcast0
15/04/01  14:41:46  INFO ObjectStore: Setting MetaStore object pin classeswith  
hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
15/04/01  14:41:46  INFO Datastore: The 
class"org.apache.hadoop.hive.metastore.model.MFieldSchema"  is  taggedas  
"embedded-only"  so doesnot  have its own datastoretable.
15/04/01  14:41:46  INFO Datastore: The 
class"org.apache.hadoop.hive.metastore.model.MOrder"  is  taggedas  
"embedded-only"  so doesnot  have its own datastoretable.
15/04/01  14:41:47  INFO Datastore: The 
class"org.apache.hadoop.hive.metastore.model.MFieldSchema"  is  taggedas  
"embedded-only"  so doesnot  have its own datastoretable.
15/04/01  14:41:47  INFO Datastore: The 
class"org.apache.hadoop.hive.metastore.model.MOrder"  is  taggedas  
"embedded-only"  so doesnot  have its own datastoretable.
15/04/01  14:41:47  INFO Query: Readingin  resultsfor  
query"org.datanucleus.store.rdbms.query.SQLQuery@0"  since theconnection  
usedis  closing
15/04/01  14:41:47  INFO ObjectStore: Initialized ObjectStore
15/04/01  14:41:47  INFO HiveMetaStore: Added admin rolein  metastore
15/04/01  14:41:47  INFO HiveMetaStore: Addedpublic  rolein  metastore
15/04/01  14:41:48  INFO HiveMetaStore:No  user  is  addedin  admin role, since 
configis  empty
15/04/01  14:41:48  INFO SessionState:No  Tezsession  requiredat  this point. 
hive.execution.engine=mr.
15/04/01  14:41:49  INFO ParseDriver: Parsing command:SELECT  path, name,value, 
v1.peValue, v1.peName
              FROM  metric
              lateralview  json_tuple(pathElements,'name','value') v1
                as  peName, peValue
15/04/01  14:41:49  INFO ParseDriver: Parse Completed
Exception  in  thread"main"  java.lang.ClassNotFoundException: json_tuple
     at  java.net.URLClassLoader$1.run(URLClassLoader.java:372)
     at  java.net.URLClassLoader$1.run(URLClassLoader.java:361)
     at  java.security.AccessController.doPrivileged(Native Method)
     at  java.net.URLClassLoader.findClass(URLClassLoader.java:360)
     at  java.lang.ClassLoader.loadClass(ClassLoader.java:424)
     at  java.lang.ClassLoader.loadClass(ClassLoader.java:357)
     at  
org.apache.spark.sql.hive.HiveFunctionWrapper.createFunction(Shim13.scala:141)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:261)
     at  org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:261)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.outputInspector$lzycompute(hiveUdfs.scala:267)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.outputInspector(hiveUdfs.scala:267)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes$lzycompute(hiveUdfs.scala:272)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.outputDataTypes(hiveUdfs.scala:272)
     at  
org.apache.spark.sql.hive.HiveGenericUdtf.makeOutput(hiveUdfs.scala:278)
     at  
org.apache.spark.sql.catalyst.expressions.Generator.output(generators.scala:60)
     at  
org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
        at  
org.apache.spark.sql.catalyst.plans.logical.Generate$$anonfun$1.apply(basicOperators.scala:50)
     at  scala.Option.map(Option.scala:145)
     at  
org.apache.spark.sql.catalyst.plans.logical.Generate.generatorOutput(basicOperators.scala:50)
     at  
org.apache.spark.sql.catalyst.plans.logical.Generate.output(basicOperators.scala:60)
     at  
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:118)
        at  
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveChildren$1.apply(LogicalPlan.scala:118)
     at  
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at  
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
     at  scala.collection.immutable.List.foreach(List.scala:318)
     at  
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
     at  scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
     at  
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:118)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$6$$anonfun$applyOrElse$1.applyOrElse(Analyzer.scala:159)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$6$$anonfun$applyOrElse$1.applyOrElse(Analyzer.scala:156)
     at  
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144)
     at  org.apache.spark.sql.catalyst.plans.QueryPlan.org  
<http://catalyst.plans.QueryPlan.org>$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:71)
        at  
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:85)
     at  
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at  
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at  
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at  scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
     at  scala.collection.AbstractTraversable.map(Traversable.scala:105)
     at  
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:84)
        at  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
     at  scala.collection.Iterator$class.foreach(Iterator.scala:727)
     at  scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
     at  
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
     at  
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
     at  
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
     at  scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
     at  scala.collection.AbstractIterator.to(Iterator.scala:1157)
     at  
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
     at  scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
     at  
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
     at  scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
     at  
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:89)
     at  
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:60)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$6.applyOrElse(Analyzer.scala:156)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$6.applyOrElse(Analyzer.scala:153)
     at  
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:206)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:153)
     at  
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:152)
     at  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
     at  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
     at  
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
     at  scala.collection.immutable.List.foldLeft(List.scala:84)
     at  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
        at  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
     at  scala.collection.immutable.List.foreach(List.scala:318)
     at  
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422)
     at  
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422)
     at  org.apache.spark.sql.SchemaRDD.collect(SchemaRDD.scala:444)
     at  
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:119)
     at  
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
     at  sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at  
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at  
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at  java.lang.reflect.Method.invoke(Method.java:483)
     at  org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
     at  org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
     at  org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)|

Json:

|"metric": {

     "path":"/PA/Pittsburgh/12345 Westbrook Drive/main/theromostat-1",
     "pathElements": [
     {
         "node":"State",
         "value":"PA"
     },
     {
         "node":"City",
         "value":"Pittsburgh"
     },
     {
         "node":"Street",
         "value":"12345 Westbrook Drive"
     },
     {
         "node":"level",
         "value":"main"
     },
     {
         "node":"device",
         "value":"thermostat"
     }
     ],
     "name":"Current Temperature",
     "value":29.590943279257175,
     "timestamp":"2015-03-27T14:53:46+0000"
   }|

Here is the code that produces the error:

|// Spark imports
import  org.apache.spark.{SparkConf, SparkContext}
import  org.apache.spark.SparkContext._

import  org.apache.spark.rdd.RDD

import  org.apache.spark.sql.{SchemaRDD,SQLContext}
import  org.apache.spark.sql.hive._

// ES imports
import  org.elasticsearch.spark._
import  org.elasticsearch.spark.sql._

def  main(args: Array[String]) {
     val  sc = sparkInit

     @transient
     val  hiveContext =new  org.apache.spark.sql.hive.HiveContext(sc)

     import  hiveContext._

     val  start = System.currentTimeMillis()

     /*
      * Read from ES and provide some insights with SparkSQL
      */
     val  esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")

     esData.collect.foreach(println(_))

     val  end = System.currentTimeMillis()
     println(s"Total time: ${end-start} ms")

     println("Create Metric Temporary Table for querying")

     val  schemaRDD = hiveContext.sql(
           "CREATE TEMPORARY TABLE metric"  +
           "USING org.elasticsearch.spark.sql"  +
           "OPTIONS (resource 'device/metric')"  )

     hiveContext.sql(
         """SELECT path, name, value, v1.peValue, v1.peName
              FROM metric
              lateral view json_tuple(pathElements, 'name', 'value') v1
                as peName, peValue
         """)
         .collect.foreach(println(_))
   }
}|

More than likely I’m missing a jar, but not sure what that would be.

-Todd



Reply via email to