Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the assistance, I found the error it wan something I had donep; PEBCAK. I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in the project/lib directory causing it to be managed dependency and being brought in first, even though the build.sbt had the correct version specified, 2.1.0.BUILD-SNAPSHOT No reason for it to bet there at all and something I don't usually do. Thanks aging for point out the fact that it was a version mismatch issue. -Todd On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao wrote: > Todd, can you try run the code in Spark shell (bin/spark-shell), maybe > you need to write some fake code to call the function in MappingUtils > .scala, in the meantime, can you also check the jar dependencies tree of > your project? Or the download dependency jar files, just in case multiple > versions of spark has been introduced. > > > > *From:* Todd Nist [mailto:tsind...@gmail.com] > *Sent:* Thursday, March 19, 2015 9:04 AM > *To:* Cheng, Hao > *Cc:* user@spark.apache.org > *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary > table > > > > Thanks for the quick response. > > The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. > Here is the startup: > > radtech>$ ./sbin/start-master.sh > > starting org.apache.spark.deploy.master.Master, logging *to* > /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out* > > > > Spark *assembly* *has* been built *with* Hive, including Datanucleus jars > *on* classpath > > Spark Command: java -cp > ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar > -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m > org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 > --webui-port 8080 > > > > > > 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, > INT] > > 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); > users *with* modify permissions: *Set*(tnist) > > 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started > > 15/03/18 20:31:41 INFO Remoting: Starting remoting > > 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses > :[akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: > [akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* > port 7077. > > 15/03/18 20:31:41 INFO Master: Starting Spark master at > spark://radtech.io:7077 > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* > port 8080. > > 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at > http://192.168.1.5:8080 > > 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE > > My build.sbt for the spark job is as follows: > > import AssemblyKeys._ > > > > // activating assembly plugin > > assemblySettings > > > > name := "elasticsearch-spark" > > > > *version* := "0.0.1" > > > > val SCALA_VERSION = "2.10.4" > > > > val SPARK_VERSION = "1.2.1" > > > > val defaultSettings = Defaults.coreDefaultSettings ++ Seq( > > organization := "io.radtec", > > scalaVersion := SCALA_VERSION, > > resolvers := Seq( > > //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo";, > > Resolver.typesafeRepo("releases")), > > scalacOptions ++= Seq( > > "-unchecked", > > "-deprecation", > > "-Xlint", > > "-Ywarn-dead-code", > > "-language:_", > > "-target:jvm-1.7", > > "-encoding", > > "UTF-8" > > ), > > parallelExecution in Test := false, > > testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), > > publishArtifact in (Test, packageBin) := true, > > unmanagedSourceDirectories in Compile <<=
Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the quick response. The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup: radtech>$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.out Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080 15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM, HUP, INT]15/03/18 20:31:40 INFO SecurityManager: Changing view acls to: tnist15/03/18 20:31:40 INFO SecurityManager: Changing modify acls to: tnist15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tnist); users with modify permissions: Set(tnist)15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started15/03/18 20:31:41 INFO Remoting: Starting remoting15/03/18 20:31:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' on port 7077.15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:707715/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' on port 8080.15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:808015/03/18 20:31:41 INFO Master: I have been elected leader! New state: ALIVE My build.sbt for the spark job is as follows: import AssemblyKeys._ // activating assembly plugin assemblySettings name := "elasticsearch-spark" version := "0.0.1" val SCALA_VERSION = "2.10.4"val SPARK_VERSION = "1.2.1" val defaultSettings = Defaults.coreDefaultSettings ++ Seq( organization := "io.radtec", scalaVersion := SCALA_VERSION, resolvers := Seq( //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo";, Resolver.typesafeRepo("releases")), scalacOptions ++= Seq( "-unchecked", "-deprecation", "-Xlint", "-Ywarn-dead-code", "-language:_", "-target:jvm-1.7", "-encoding", "UTF-8" ), parallelExecution in Test := false, testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), publishArtifact in (Test, packageBin) := true, unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)), unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)), EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource, credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"), publishTo := Some("Artifactory Realm" at "http://artifactory.ods:8082/artifactory/ivy-repo-local";) ) // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar val hadoopDeps = Seq ( "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided" ) // ElasticSearch Hadoop support val esHadoopDeps = Seq ( ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"). exclude("org.apache.spark", "spark-core_2.10"). exclude("org.apache.spark", "spark-streaming_2.10"). exclude("org.apache.spark", "spark-sql_2.10"). exclude("javax.jms", "jms") ) val commonDeps = Seq( "com.eaio.uuid" % "uuid" % "3.2", "joda-time" % "joda-time" % "2.3", "org.joda" % "joda-convert" % "1.6" ) val jsonDeps = Seq( "com.googlecode.json-simple"% "json-simple" % "1.1.1", "com.fasterxml.jackson.core"% "jackson-core" % "2.5.1", "com.fasterxml.jackson.core"% "jackson-annotations" % "2.5.1", "com.fasterxml.jackson.core"% "jackson-databind" % "2.5.1", "com.fasterxml.jackson.module" % "jackson-module-jaxb-annotations" % "2.5.1", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.5.1", "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.5.1", "com.fasterxml.jackson.datatype"% "jackson-datatype-joda" % "2.5.1" ) val commonTestDeps = Seq( "org.specs2" %% "specs2" % "2.3.11" % "test", "org.mockito" % "mockito-all" % "1.9.5" % "test", "org.scalacheck" %% "scalacheck" % "1.11.3" % "test", "org.scalatest"%% "scalatest"% "1.9.1"
RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first? From: Todd Nist [mailto:tsind...@gmail.com] Sent: Thursday, March 19, 2015 7:49 AM To: user@spark.apache.org Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set("es.nodes", ElasticSearch.Nodes) conf.set("es.port", ElasticSearch.HttpPort.toString()) conf.set("es.index.auto.create", "true"); conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); conf.set("spark.executor.memory","1g") conf.set("spark.kryoserializer.buffer.mb","256") val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc)