Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
Thanks for the assistance, I found the error it wan something I had donep; PEBCAK. I had placed a version of the elasticsearch-hadoop.2.1.0.BETA3 in the project/lib directory causing it to be managed dependency and being brought in first, even though the build.sbt had the correct version specified, 2.1.0.BUILD-SNAPSHOT No reason for it to bet there at all and something I don't usually do. Thanks aging for point out the fact that it was a version mismatch issue. -Todd On Wed, Mar 18, 2015 at 9:59 PM, Cheng, Hao hao.ch...@intel.com wrote: Todd, can you try run the code in Spark shell (bin/spark-shell), maybe you need to write some fake code to call the function in MappingUtils .scala, in the meantime, can you also check the jar dependencies tree of your project? Or the download dependency jar files, just in case multiple versions of spark has been introduced. *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Thursday, March 19, 2015 9:04 AM *To:* Cheng, Hao *Cc:* user@spark.apache.org *Subject:* Re: [SQL] Elasticsearch-hadoop, exception creating temporary table Thanks for the quick response. The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here is the startup: radtech$ ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging *to* /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.*out* Spark *assembly* *has* been built *with* Hive, including Datanucleus jars *on* classpath Spark Command: java -cp ::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-*assembly*-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar -Dspark.akka.logLifecycleEvents=*true* -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip radtech.io --port 7077 --webui-port 8080 15/03/18 20:31:40 INFO Master: Registered signal handlers *for* [TERM, HUP, INT] 15/03/18 20:31:40 INFO SecurityManager: Changing view acls *to*: tnist 15/03/18 20:31:40 INFO SecurityManager: Changing modify acls *to*: tnist 15/03/18 20:31:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users *with* view permissions: *Set*(tnist); users *with* modify permissions: *Set*(tnist) 15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger started 15/03/18 20:31:41 INFO Remoting: Starting remoting 15/03/18 20:31:41 INFO Remoting: Remoting started; listening *on* addresses :[akka.tcp://sparkmas...@radtech.io:7077] 15/03/18 20:31:41 INFO Remoting: Remoting now listens *on* addresses: [akka.tcp://sparkmas...@radtech.io:7077] 15/03/18 20:31:41 INFO Utils: Successfully started service 'sparkMaster' *on* port 7077. 15/03/18 20:31:41 INFO Master: Starting Spark master at spark://radtech.io:7077 15/03/18 20:31:41 INFO Utils: Successfully started service 'MasterUI' *on* port 8080. 15/03/18 20:31:41 INFO MasterWebUI: Started MasterWebUI at http://192.168.1.5:8080 15/03/18 20:31:41 INFO Master: I have been elected leader! *New* state: ALIVE My build.sbt for the spark job is as follows: import AssemblyKeys._ // activating assembly plugin assemblySettings name := elasticsearch-spark *version* := 0.0.1 val SCALA_VERSION = 2.10.4 val SPARK_VERSION = 1.2.1 val defaultSettings = Defaults.coreDefaultSettings ++ Seq( organization := io.radtec, scalaVersion := SCALA_VERSION, resolvers := Seq( //ods-repo at http://artifactory.ods:8082/artifactory/repo;, Resolver.typesafeRepo(releases)), scalacOptions ++= Seq( -unchecked, -deprecation, -Xlint, -Ywarn-dead-code, -language:_, -target:jvm-1.7, -encoding, UTF-8 ), parallelExecution in Test := false, testOptions += Tests.Argument(TestFrameworks.JUnit, -v), publishArtifact in (Test, packageBin) := true, unmanagedSourceDirectories in Compile = (scalaSource in Compile)(Seq(_)), unmanagedSourceDirectories in Test = (scalaSource in Test)(Seq(_)), EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource, credentials += Credentials(Path.userHome / .ivy2 / .credentials), publishTo := Some(Artifactory Realm *at* http://artifactory.ods:8082/artifactory/ivy-repo-local;) ) // custom Hadoop client, configured as provided, since it shouldn't go to assembly jar val hadoopDeps = Seq ( org.apache.hadoop % hadoop-client % 2.6.0 % provided ) // ElasticSearch Hadoop support val esHadoopDeps = Seq ( (org.elasticsearch % elasticsearch-hadoop % 2.1.0.BUILD-SNAPSHOT). exclude(org.apache.spark, spark-core_2.10). exclude(org.apache.spark, spark-streaming_2.10). exclude
RE: [SQL] Elasticsearch-hadoop, exception creating temporary table
Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first? From: Todd Nist [mailto:tsind...@gmail.com] Sent: Thursday, March 19, 2015 7:49 AM To: user@spark.apache.org Subject: [SQL] Elasticsearch-hadoop, exception creating temporary table I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis
Re: [SQL] Elasticsearch-hadoop, exception creating temporary table
( com.databricks %% spark-csv % 0.1, // Spark itself, configured as provided, since it shouldn't go to assembly jar org.apache.spark %% spark-core % SPARK_VERSION % provided, org.apache.spark %% spark-streaming % SPARK_VERSION % provided, org.apache.spark %% spark-sql % SPARK_VERSION % provided, org.apache.spark %% spark-hive % SPARK_VERSION % provided, (org.apache.spark%% spark-streaming-kafka % SPARK_VERSION). exclude(org.apache.spark, spark-core_2.10). exclude(org.apache.spark, spark-streaming_2.10). exclude(org.apache.spark, spark-sql_2.10). exclude(javax.jms, jms), org.apache.spark %% spark-streaming % SPARK_VERSION % test classifier tests, com.typesafe % config% 1.2.1, com.typesafe.play%% play-json % 2.3.4 ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++ commonTestDeps ++ commonDeps) resolvers ++= Seq( Resolver.sonatypeRepo(snapshots), Resolver.sonatypeRepo(public), conjars.org at http://conjars.org/repo;, JBoss Repository at http://repository.jboss.org/nexus/content/repositories/releases/;, Spray Repository at http://repo.spray.cc/;, Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/;, Akka Repository at http://repo.akka.io/releases/;, Twitter4J Repository at http://twitter4j.org/maven2/;, Apache HBase at https://repository.apache.org/content/repositories/releases;, Twitter Maven Repo at http://maven.twttr.com/;, scala-tools at https://oss.sonatype.org/content/groups/scala-tools;, Typesafe repository at http://repo.typesafe.com/typesafe/releases/;, Second Typesafe repo at http://repo.typesafe.com/typesafe/maven-releases/; ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.startsWith(META-INF) = MergeStrategy.discard case PathList(javax, servlet, xs @ _*) = MergeStrategy.first case PathList(org, apache, xs @ _*) = MergeStrategy.first case PathList(org, jboss, xs @ _*) = MergeStrategy.first case about.html = MergeStrategy.rename case reference.conf = MergeStrategy.concat case _ = MergeStrategy.first } } Am I by chance missing an exclude that is bring in an older version of spark into the Assembly; hmm need to go look at that. I am using the SNAPSHOT build of elasticsearch-hadoop as it is built against 1.2.1 of spark. Per the elasticsearch-hadoop gradle.properties the spark version set to: sparkVersion = 1.2.1 Other than possibly missing an exclude that is bring in an older version of Spark from some where, I do see that I am referencing the org.apache.hadoop % hadoop-client % 2.6.0 % provided, but I don't think that is the issue. Any other thoughts? -Todd On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao hao.ch...@intel.com wrote: Seems the elasticsearch-hadoop project was built with an old version of Spark, and then you upgraded the Spark version in execution env, as I know the StructField changed the definition in Spark 1.2, can you confirm the version problem first? *From:* Todd Nist [mailto:tsind...@gmail.com] *Sent:* Thursday, March 19, 2015 7:49 AM *To:* user@spark.apache.org *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception *in* thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108
[SQL] Elasticsearch-hadoop, exception creating temporary table
I am attempting to access ElasticSearch and expose it’s data through SparkSQL using the elasticsearch-hadoop project. I am encountering the following exception when trying to create a Temporary table from a resource in ElasticSearch.: 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at EsSparkSQL.scala:51, took 0.862184 s Create Temporary Table for querying Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.types.StructField.init(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V at org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54) at org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33) at org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32) at org.elasticsearch.spark.sql.ElasticsearchRelation.init(DefaultSource.scala:36) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20) at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87) at io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) I have loaded the “accounts.json” file from ElasticSearch into my ElasticSearch cluster. The mapping looks as follows: radtech:elastic-search-work tnist$ curl -XGET 'http://localhost:9200/bank/_mapping' {bank:{mappings:{account:{properties:{account_number:{type:long},address:{type:string},age:{type:long},balance:{type:long},city:{type:string},email:{type:string},employer:{type:string},firstname:{type:string},gender:{type:string},lastname:{type:string},state:{type:string}} I can read the data just fine doing the following: import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SchemaRDD,SQLContext} // ES imports import org.elasticsearch.spark._ import org.elasticsearch.spark.sql._ import io.radtech.spark.utils.{Settings, Spark, ElasticSearch} object ElasticSearchReadWrite { /** * Spark specific configuration */ def sparkInit(): SparkContext = { val conf = new SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) conf.set(es.nodes, ElasticSearch.Nodes) conf.set(es.port, ElasticSearch.HttpPort.toString()) conf.set(es.index.auto.create, true); conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.executor.memory,1g) conf.set(spark.kryoserializer.buffer.mb,256) val sparkContext = new SparkContext(conf) sparkContext } def main(args: Array[String]) { val sc = sparkInit val sqlContext = new SQLContext(sc) import sqlContext._ val start = System.currentTimeMillis() /* * Read from ES and query with with Spark SparkSQL */ val esData = sc.esRDD(s${ElasticSearch.Index}/${ElasticSearch.Type}) esData.collect.foreach(println(_)) val end = System.currentTimeMillis() println(sTotal time: ${end-start} ms) This works fine and and prints the content of esData out as one would expect. 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at ElasticSearchReadWrite.scala:67, took 6.897443 s (4,Map(employer - Tourmania, city - Eastvale, address - 986