Thanks for the assistance, I found the error it wan something I had donep; PEBCAK. I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in the project/lib directory causing it to be managed dependency and being brought in first, even though the build.sbt had the correct version specified, 2.1.0.BUILD-SNAPSHOT
No reason for it to bet there at all and something I don't usually do. Thanks aging for point out the fact that it was a version mismatch issue. -Todd On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao <hao.ch...@intel.com> wrote: > Todd, can you try run the code in Spark shell (bin/spark-shell), maybe > you need to write some fake code to call the function in MappingUtils > .scala, in the meantime, can you also check the jar dependencies tree of > your project? Or the download dependency jar files, just in case multiple > versions of spark has been introduced. > > > > *From:* Todd Nist [mailto:tsind...@gmail.com] > *Sent:* Thursday, March 19, 2015 9:04 AM > *To:* Cheng, Hao > *Cc:* user@spark.apache.org > *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary > table > > > > Thanks for the quick response. > > The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. > Here is the startup: > > radtech>$ ./sbin/start-master.sh > > starting org.apache.spark.deploy.master.Master, logging *to* > /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out* > > > > Spark *assembly* *has* been built *with* Hive, including Datanucleus jars > *on* classpath > > Spark Command: java -cp > ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar > -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m > org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 > --webui-port 8080 > > ======================================== > > > > 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, > INT] > > 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist > > 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); > users *with* modify permissions: *Set*(tnist) > > 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started > > 15/03/18 20:31:41 INFO Remoting: Starting remoting > > 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses > :[akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: > [akka.tcp://sparkmas...@radtech.io:7077] > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* > port 7077. > > 15/03/18 20:31:41 INFO Master: Starting Spark master at > spark://radtech.io:7077 > > 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* > port 8080. > > 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at > http://192.168.1.5:8080 > > 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE > > My build.sbt for the spark job is as follows: > > import AssemblyKeys._ > > > > // activating assembly plugin > > assemblySettings > > > > name := "elasticsearch-spark" > > > > *version* := "0.0.1" > > > > val SCALA_VERSION = "2.10.4" > > > > val SPARK_VERSION = "1.2.1" > > > > val defaultSettings = Defaults.coreDefaultSettings ++ Seq( > > organization := "io.radtec", > > scalaVersion := SCALA_VERSION, > > resolvers := Seq( > > //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo", > > Resolver.typesafeRepo("releases")), > > scalacOptions ++= Seq( > > "-unchecked", > > "-deprecation", > > "-Xlint", > > "-Ywarn-dead-code", > > "-language:_", > > "-target:jvm-1.7", > > "-encoding", > > "UTF-8" > > ), > > parallelExecution in Test := false, > > testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), > > publishArtifact in (Test, packageBin) := true, > > unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)), > > unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)), > > EclipseKeys.createSrc := EclipseCreateSrc.Default + > EclipseCreateSrc.Resource, > > credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"), > > publishTo := Some("Artifactory Realm" *at* > "http://artifactory.ods:8082/artifactory/ivy-repo-local") > > ) > > > > // custom Hadoop client, configured as provided, since it shouldn't go to > assembly jar > > val hadoopDeps = Seq ( > > "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided" > > ) > > > > // ElasticSearch Hadoop support > > val esHadoopDeps = Seq ( > > ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"). > > exclude("org.apache.spark", "spark-core_2.10"). > > exclude("org.apache.spark", "spark-streaming_2.10"). > > exclude("org.apache.spark", "spark-sql_2.10"). > > exclude("javax.jms", "jms") > > ) > > > > val commonDeps = Seq( > > "com.eaio.uuid" % "uuid" % "3.2", > > "joda-time" % "joda-time" % "2.3", > > "org.joda" % "joda-convert" % "1.6" > > ) > > > > val jsonDeps = Seq( > > "com.googlecode.json-simple" % "json-simple" % > "1.1.1", > > "com.fasterxml.jackson.core" % "jackson-core" % > "2.5.1", > > "com.fasterxml.jackson.core" % "jackson-annotations" % > "2.5.1", > > "com.fasterxml.jackson.core" % "jackson-databind" % > "2.5.1", > > "com.fasterxml.jackson.module" % "jackson-module-jaxb-annotations" % > "2.5.1", > > "com.fasterxml.jackson.module" %% "jackson-module-scala" % > "2.5.1", > > "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % > "2.5.1", > > "com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % > "2.5.1" > > ) > > > > val commonTestDeps = Seq( > > "org.specs2" %% "specs2" % "2.3.11" % > "test", > > "org.mockito" % "mockito-all" % "1.9.5" % > "test", > > "org.scalacheck" %% "scalacheck" % "1.11.3" % > "test", > > "org.scalatest" %% "scalatest" % "1.9.1" % > "test" > > ) > > > > // Project definitions > > > > lazy val root = (project in *file*(".")) > > .settings(defaultSettings:_*) > > .settings(libraryDependencies ++= Seq( > > "com.databricks" %% "spark-csv" % "0.1", > > // Spark itself, configured as provided, since it shouldn't > go to assembly jar > > "org.apache.spark" %% "spark-core" % > SPARK_VERSION % "provided", > > "org.apache.spark" %% "spark-streaming" % > SPARK_VERSION % "provided", > > "org.apache.spark" %% "spark-sql" % > SPARK_VERSION % "provided", > > "org.apache.spark" %% "spark-hive" % > SPARK_VERSION % "provided", > > ("org.apache.spark" %% "spark-streaming-kafka" % > SPARK_VERSION). > > exclude("org.apache.spark", "spark-core_2.10"). > > exclude("org.apache.spark", "spark-streaming_2.10"). > > exclude("org.apache.spark", "spark-sql_2.10"). > > exclude("javax.jms", "jms"), > > "org.apache.spark" %% "spark-streaming" % > SPARK_VERSION % "test" classifier "tests", > > "com.typesafe" % "config" % > "1.2.1", > > "com.typesafe.play" %% "play-json" % > "2.3.4" > > ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ > commonDeps) > > > > resolvers ++= Seq( > > Resolver.sonatypeRepo("snapshots"), > > Resolver.sonatypeRepo("public"), > > "conjars.org" *at* "http://conjars.org/repo", > > "JBoss Repository" *at* > "http://repository.jboss.org/nexus/content/repositories/releases/", > > "Spray Repository" *at* "http://repo.spray.cc/", > > "Cloudera Repository" *at* > "https://repository.cloudera.com/artifactory/cloudera-repos/", > > "Akka Repository" *at* "http://repo.akka.io/releases/", > > "Twitter4J Repository" *at* "http://twitter4j.org/maven2/", > > "Apache HBase" *at* > "https://repository.apache.org/content/repositories/releases", > > "Twitter Maven Repo" *at* "http://maven.twttr.com/", > > "scala-tools" *at* "https://oss.sonatype.org/content/groups/scala-tools", > > "Typesafe repository" *at* "http://repo.typesafe.com/typesafe/releases/", > > "Second Typesafe repo" *at* > "http://repo.typesafe.com/typesafe/maven-releases/" > > ) > > > > mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => > > { > > *case* m *if* m.toLowerCase.endsWith("manifest.mf") => > MergeStrategy.discard > > *case* m *if* m.startsWith("META-INF") => MergeStrategy.discard > > *case* PathList("javax", "servlet", xs @ _*) => MergeStrategy.*first* > > *case* PathList("org", "apache", xs @ _*) => MergeStrategy.*first* > > *case* PathList("org", "jboss", xs @ _*) => MergeStrategy.*first* > > *case* "about.html" => MergeStrategy.*rename* > > *case* "reference.conf" => MergeStrategy.concat > > *case* _ => MergeStrategy.*first* > > } > > } > > Am I by chance missing an exclude that is bring in an older version of > spark into the Assembly; hmm need to go look at that. > > I am using the SNAPSHOT build of elasticsearch-hadoop as it is built > against 1.2.1 of spark. Per the elasticsearch-hadoop gradle.properties > the spark version set to: > > sparkVersion = 1.2.1 > > Other than possibly missing an exclude that is bring in an older version > of Spark from some where, I do see that I am referencing the > "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't > think that is the issue. > > Any other thoughts? > > -Todd > > > > On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <hao.ch...@intel.com> wrote: > > Seems the elasticsearch-hadoop project was built with an old version of > Spark, and then you upgraded the Spark version in execution env, as I know > the StructField changed the definition in Spark 1.2, can you confirm the > version problem first? > > > > *From:* Todd Nist [mailto:tsind...@gmail.com] > *Sent:* Thursday, March 19, 2015 7:49 AM > *To:* user@spark.apache.org > *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table > > > > > > I am attempting to access ElasticSearch and expose it’s data through > SparkSQL using the elasticsearch-hadoop project. I am encountering the > following exception when trying to create a Temporary table from a resource > in ElasticSearch.: > > 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at > EsSparkSQL.scala:51, took 0.862184 s > > Create Temporary Table for querying > > Exception *in* thread "main" java.lang.NoSuchMethodError: > org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V > > at > org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) > > at > org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) > > at > org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at > org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) > > at > org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36) > > at > org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) > > at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) > > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) > > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) > > at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) > > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) > > at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) > > at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) > > at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108) > > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) > > at > io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) > > at > io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > I have loaded the “accounts.json” file from ElasticSearch into my > ElasticSearch cluster. The mapping looks as follows: > > radtech:elastic-search-work tnist$ curl -XGET > 'http://localhost:9200/bank/_mapping' > > {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}} > > I can read the data just fine doing the following: > > import java.io.File > > > > import scala.collection.JavaConversions._ > > > > import org.apache.spark.{SparkConf, SparkContext} > > import org.apache.spark.SparkContext._ > > import org.apache.spark.rdd.RDD > > import org.apache.spark.sql.{SchemaRDD,SQLContext} > > > > // ES imports > > import org.elasticsearch.spark._ > > import org.elasticsearch.spark.sql._ > > > > import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} > > > > object ElasticSearchReadWrite { > > > > /** > > * Spark specific configuration > > */ > > def sparkInit(): SparkContext = { > > val conf = new > SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) > > conf.set("es.nodes", ElasticSearch.Nodes) > > conf.set("es.port", ElasticSearch.HttpPort.toString()) > > conf.set("es.index.auto.create", "true"); > > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > > conf.set("spark.executor.memory","1g") > > conf.set("spark.kryoserializer.buffer.mb","256") > > > > val sparkContext = new SparkContext(conf) > > > > sparkContext > > } > > > > def main(args: Array[String]) { > > > > val sc = sparkInit > > > > val sqlContext = new SQLContext(sc) > > import sqlContext._ > > > > val start = System.currentTimeMillis() > > > > /* > > * Read from ES and query with with Spark & SparkSQL > > */ > > val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}") > > > > esData.collect.foreach(println(_)) > > > > val end = System.currentTimeMillis() > > println(s"Total time: ${end-start} ms") > > This works fine and and prints the content of esData out as one would > expect. > > 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at > ElasticSearchReadWrite*.*scala:67, took 6.897443 s > > (4,*Map*(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff > Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> > Flores, email -> rodriquezflores@tourmania*.*com > <rodriquezflo...@tourmania.com>, firstname -> Rodriquez, account_number -> 4)) > > (9,*Map*(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, > state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, > email -> opalmeadows@cedward*.*com <opalmead...@cedward.com>, firstname -> > Opal, account_number -> 9)) > > ... > > As does creating a new index and type like this: > > println("read json in and store in ES") > > // read *in* JSON *and* store *in* ES > > val path = "document.json" > > val rdd : SchemaRDD = sqlContext.jsonFile(path) > > > > rdd.saveToEs("myIndex/myDoc") > > However, when I attempt to access the the table via the sqlContext like > this I get the exception shown above: > > println("*Create* *Temporary* *Table* *for* querying") > > > > val schemaRDD: SchemaRDD = sqlContext.sql( > > "*CREATE* *TEMPORARY* *TABLE* account " + > > "*USING* org.elasticsearch.spark.*sql* " + > > "OPTIONS (resource 'bank/account') " ) > > } > > } > > I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the > elasticsearch-hadoop: > > "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT" > > Any insight on what I am doing wrong? > > TIA for the assistance. > > -Todd > > >