Thanks for the quick response. The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup:
radtech>$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.out Spark assembly has been built with Hive, including Datanucleus jars on classpath Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080 ======================================== 15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM, HUP, INT]15/03/18 20:31:40 INFO SecurityManager: Changing view acls to: tnist15/03/18 20:31:40 INFO SecurityManager: Changing modify acls to: tnist15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(tnist); users with modify permissions: Set(tnist)15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started15/03/18 20:31:41 INFO Remoting: Starting remoting15/03/18 20:31:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' on port 7077.15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:707715/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' on port 8080.15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:808015/03/18 20:31:41 INFO Master: I have been elected leader! New state: ALIVE My build.sbt for the spark job is as follows: import AssemblyKeys._ // activating assembly plugin assemblySettings name := "elasticsearch-spark" version := "0.0.1" val SCALA_VERSION = "2.10.4"val SPARK_VERSION = "1.2.1" val defaultSettings = Defaults.coreDefaultSettings ++ Seq( organization := "io.radtec", scalaVersion := SCALA_VERSION, resolvers := Seq( //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo", Resolver.typesafeRepo("releases")), scalacOptions ++= Seq( "-unchecked", "-deprecation", "-Xlint", "-Ywarn-dead-code", "-language:_", "-target:jvm-1.7", "-encoding", "UTF-8" ), parallelExecution in Test := false, testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"), publishArtifact in (Test, packageBin) := true, unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)), unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)), EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource, credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"), publishTo := Some("Artifactory Realm" at "http://artifactory.ods:8082/artifactory/ivy-repo-local") ) // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar val hadoopDeps = Seq ( "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided" ) // ElasticSearch Hadoop support val esHadoopDeps = Seq ( ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"). exclude("org.apache.spark", "spark-core_2.10"). exclude("org.apache.spark", "spark-streaming_2.10"). exclude("org.apache.spark", "spark-sql_2.10"). exclude("javax.jms", "jms") ) val commonDeps = Seq( "com.eaio.uuid" % "uuid" % "3.2", "joda-time" % "joda-time" % "2.3", "org.joda" % "joda-convert" % "1.6" ) val jsonDeps = Seq( "com.googlecode.json-simple" % "json-simple" % "1.1.1", "com.fasterxml.jackson.core" % "jackson-core" % "2.5.1", "com.fasterxml.jackson.core" % "jackson-annotations" % "2.5.1", "com.fasterxml.jackson.core" % "jackson-databind" % "2.5.1", "com.fasterxml.jackson.module" % "jackson-module-jaxb-annotations" % "2.5.1", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.5.1", "com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % "2.5.1", "com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % "2.5.1" ) val commonTestDeps = Seq( "org.specs2" %% "specs2" % "2.3.11" % "test", "org.mockito" % "mockito-all" % "1.9.5" % "test", "org.scalacheck" %% "scalacheck" % "1.11.3" % "test", "org.scalatest" %% "scalatest" % "1.9.1" % "test") // Project definitions lazy val root = (project in file(".")) .settings(defaultSettings:_*) .settings(libraryDependencies ++= Seq( "com.databricks" %% "spark-csv" % "0.1", // Spark itself, configured as provided, since it shouldn't go to assembly jar "org.apache.spark" %% "spark-core" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-sql" % SPARK_VERSION % "provided", "org.apache.spark" %% "spark-hive" % SPARK_VERSION % "provided", ("org.apache.spark" %% "spark-streaming-kafka" % SPARK_VERSION). exclude("org.apache.spark", "spark-core_2.10"). exclude("org.apache.spark", "spark-streaming_2.10"). exclude("org.apache.spark", "spark-sql_2.10"). exclude("javax.jms", "jms"), "org.apache.spark" %% "spark-streaming" % SPARK_VERSION % "test" classifier "tests", "com.typesafe" % "config" % "1.2.1", "com.typesafe.play" %% "play-json" % "2.3.4" ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps) resolvers ++= Seq( Resolver.sonatypeRepo("snapshots"), Resolver.sonatypeRepo("public"), "conjars.org" at "http://conjars.org/repo", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", "Spray Repository" at "http://repo.spray.cc/", "Cloudera Repository" at "https://repository.cloudera.com/artifactory/cloudera-repos/", "Akka Repository" at "http://repo.akka.io/releases/", "Twitter4J Repository" at "http://twitter4j.org/maven2/", "Apache HBase" at "https://repository.apache.org/content/repositories/releases", "Twitter Maven Repo" at "http://maven.twttr.com/", "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools", "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/", "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/" ) mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.startsWith("META-INF") => MergeStrategy.discard case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList("org", "apache", xs @ _*) => MergeStrategy.first case PathList("org", "jboss", xs @ _*) => MergeStrategy.first case "about.html" => MergeStrategy.rename case "reference.conf" => MergeStrategy.concat case _ => MergeStrategy.first } } Am I by chance missing an exclude that is bring in an older version of spark into the Assembly; hmm need to go look at that. I am using the SNAPSHOT build of elasticsearch-hadoop as it is built against 1.2.1 of spark. Per the elasticsearch-hadoop gradle.properties the spark version set to: sparkVersion = 1.2.1 Other than possibly missing an exclude that is bring in an older version of Spark from some where, I do see that I am referencing the "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't think that is the issue. Any other thoughts? -Todd On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <hao.ch...@intel.com> wrote: > Seems the elasticsearch-hadoop project was built with an old version of > Spark, and then you upgraded the Spark version in execution env, as I know > the StructField changed the definition in Spark 1.2, can you confirm the > version problem first? > > > > *From:* Todd Nist [mailto:tsind...@gmail.com] > *Sent:* Thursday, March 19, 2015 7:49 AM > *To:* user@spark.apache.org > *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table > > > > > > I am attempting to access ElasticSearch and expose it’s data through > SparkSQL using the elasticsearch-hadoop project. I am encountering the > following exception when trying to create a Temporary table from a resource > in ElasticSearch.: > > 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at > EsSparkSQL.scala:51, took 0.862184 s > > Create Temporary Table for querying > > Exception *in* thread "main" java.lang.NoSuchMethodError: > org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V > > at > org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) > > at > org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) > > at > org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at > org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) > > at > org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) > > at > org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36) > > at > org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) > > at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) > > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) > > at > org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) > > at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) > > at > org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) > > at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) > > at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) > > at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108) > > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) > > at > io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) > > at > io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > > I have loaded the “accounts.json” file from ElasticSearch into my > ElasticSearch cluster. The mapping looks as follows: > > radtech:elastic-search-work tnist$ curl -XGET > 'http://localhost:9200/bank/_mapping' > > {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}} > > I can read the data just fine doing the following: > > import java.io.File > > > > import scala.collection.JavaConversions._ > > > > import org.apache.spark.{SparkConf, SparkContext} > > import org.apache.spark.SparkContext._ > > import org.apache.spark.rdd.RDD > > import org.apache.spark.sql.{SchemaRDD,SQLContext} > > > > // ES imports > > import org.elasticsearch.spark._ > > import org.elasticsearch.spark.sql._ > > > > import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} > > > > object ElasticSearchReadWrite { > > > > /** > > * Spark specific configuration > > */ > > def sparkInit(): SparkContext = { > > val conf = new > SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) > > conf.set("es.nodes", ElasticSearch.Nodes) > > conf.set("es.port", ElasticSearch.HttpPort.toString()) > > conf.set("es.index.auto.create", "true"); > > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > > conf.set("spark.executor.memory","1g") > > conf.set("spark.kryoserializer.buffer.mb","256") > > > > val sparkContext = new SparkContext(conf) > > > > sparkContext > > } > > > > def main(args: Array[String]) { > > > > val sc = sparkInit > > > > val sqlContext = new SQLContext(sc) > > import sqlContext._ > > > > val start = System.currentTimeMillis() > > > > /* > > * Read from ES and query with with Spark & SparkSQL > > */ > > val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}") > > > > esData.collect.foreach(println(_)) > > > > val end = System.currentTimeMillis() > > println(s"Total time: ${end-start} ms") > > This works fine and and prints the content of esData out as one would > expect. > > 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at > ElasticSearchReadWrite*.*scala:67, took 6.897443 s > > (4,*Map*(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff > Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> > Flores, email -> rodriquezflores@tourmania*.*com > <rodriquezflo...@tourmania.com>, firstname -> Rodriquez, account_number -> 4)) > > (9,*Map*(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, > state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, > email -> opalmeadows@cedward*.*com <opalmead...@cedward.com>, firstname -> > Opal, account_number -> 9)) > > ... > > As does creating a new index and type like this: > > println("read json in and store in ES") > > // read *in* JSON *and* store *in* ES > > val path = "document.json" > > val rdd : SchemaRDD = sqlContext.jsonFile(path) > > > > rdd.saveToEs("myIndex/myDoc") > > However, when I attempt to access the the table via the sqlContext like > this I get the exception shown above: > > println("*Create* *Temporary* *Table* *for* querying") > > > > val schemaRDD: SchemaRDD = sqlContext.sql( > > "*CREATE* *TEMPORARY* *TABLE* account " + > > "*USING* org.elasticsearch.spark.*sql* " + > > "OPTIONS (resource 'bank/account') " ) > > } > > } > > I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the > elasticsearch-hadoop: > > "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT" > > Any insight on what I am doing wrong? > > TIA for the assistance. > > -Todd >