Thanks for the quick response.

The spark server is spark-1.2.1-bin-hadoop2.4 from the Spark download. Here
is the startup:

radtech>$ ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to
/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-tnist-org.apache.spark.deploy.master.Master-1-radtech.io.out

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Spark Command: java -cp
::/usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/usr/local/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
-Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m
org.apache.spark.deploy.master.Master --ip radtech.io --port 7077
--webui-port 8080
========================================
15/03/18 20:31:40 INFO Master: Registered signal handlers for [TERM,
HUP, INT]15/03/18 20:31:40 INFO SecurityManager: Changing view acls
to: tnist15/03/18 20:31:40 INFO SecurityManager: Changing modify acls
to: tnist15/03/18 20:31:40 INFO SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view
permissions: Set(tnist); users with modify permissions:
Set(tnist)15/03/18 20:31:41 INFO Slf4jLogger: Slf4jLogger
started15/03/18 20:31:41 INFO Remoting: Starting remoting15/03/18
20:31:41 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO
Remoting: Remoting now listens on addresses:
[akka.tcp://sparkmas...@radtech.io:7077]15/03/18 20:31:41 INFO Utils:
Successfully started service 'sparkMaster' on port 7077.15/03/18
20:31:41 INFO Master: Starting Spark master at
spark://radtech.io:707715/03/18 20:31:41 INFO Utils: Successfully
started service 'MasterUI' on port 8080.15/03/18 20:31:41 INFO
MasterWebUI: Started MasterWebUI at http://192.168.1.5:808015/03/18
20:31:41 INFO Master: I have been elected leader! New state: ALIVE

My build.sbt for the spark job is as follows:

import AssemblyKeys._
// activating assembly plugin
assemblySettings

name := "elasticsearch-spark"
version := "0.0.1"

val SCALA_VERSION = "2.10.4"val SPARK_VERSION = "1.2.1"

val defaultSettings = Defaults.coreDefaultSettings ++ Seq(
  organization := "io.radtec",
  scalaVersion := SCALA_VERSION,
  resolvers := Seq(
    //"ods-repo" at "http://artifactory.ods:8082/artifactory/repo";,
    Resolver.typesafeRepo("releases")),
  scalacOptions ++= Seq(
    "-unchecked",
    "-deprecation",
    "-Xlint",
    "-Ywarn-dead-code",
    "-language:_",
    "-target:jvm-1.7",
    "-encoding",
    "UTF-8"
  ),
  parallelExecution in Test := false,
  testOptions += Tests.Argument(TestFrameworks.JUnit, "-v"),
  publishArtifact in (Test, packageBin) := true,
  unmanagedSourceDirectories in Compile <<= (scalaSource in Compile)(Seq(_)),
  unmanagedSourceDirectories in Test <<= (scalaSource in Test)(Seq(_)),
  EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.Resource,
  credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),
  publishTo := Some("Artifactory Realm" at
"http://artifactory.ods:8082/artifactory/ivy-repo-local";)
)
// custom Hadoop client, configured as provided, since it shouldn't go
to assembly jar
val hadoopDeps = Seq (
  "org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided"
)
// ElasticSearch Hadoop support
val esHadoopDeps = Seq (
  ("org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT").
    exclude("org.apache.spark", "spark-core_2.10").
    exclude("org.apache.spark", "spark-streaming_2.10").
    exclude("org.apache.spark", "spark-sql_2.10").
    exclude("javax.jms", "jms")
)

val commonDeps = Seq(
  "com.eaio.uuid"             % "uuid"                  % "3.2",
  "joda-time"                 % "joda-time"             % "2.3",
  "org.joda"                  % "joda-convert"          % "1.6"
)

val jsonDeps = Seq(
  "com.googlecode.json-simple"        % "json-simple"
   % "1.1.1",
  "com.fasterxml.jackson.core"        % "jackson-core"
   % "2.5.1",
  "com.fasterxml.jackson.core"        % "jackson-annotations"
   % "2.5.1",
  "com.fasterxml.jackson.core"        % "jackson-databind"
   % "2.5.1",
  "com.fasterxml.jackson.module"      %
"jackson-module-jaxb-annotations" % "2.5.1",
  "com.fasterxml.jackson.module"     %% "jackson-module-scala"
   % "2.5.1",
  "com.fasterxml.jackson.dataformat"  % "jackson-dataformat-xml"
   % "2.5.1",
  "com.fasterxml.jackson.datatype"    % "jackson-datatype-joda"
   % "2.5.1"
)

val commonTestDeps = Seq(
  "org.specs2"               %% "specs2"                   % "2.3.11"
     % "test",
  "org.mockito"               % "mockito-all"              % "1.9.5"
     % "test",
  "org.scalacheck"           %% "scalacheck"               % "1.11.3"
     % "test",
  "org.scalatest"            %% "scalatest"                % "1.9.1"
     % "test")
// Project definitions

lazy val root = (project in file("."))
        .settings(defaultSettings:_*)
        .settings(libraryDependencies ++= Seq(
                "com.databricks"           %% "spark-csv"             % "0.1",
                // Spark itself, configured as provided, since it
shouldn't go to assembly jar
                "org.apache.spark"         %% "spark-core"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-streaming"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-sql"
% SPARK_VERSION   % "provided",
                "org.apache.spark"         %% "spark-hive"
% SPARK_VERSION   % "provided",
                ("org.apache.spark"        %% "spark-streaming-kafka"
% SPARK_VERSION).
                        exclude("org.apache.spark", "spark-core_2.10").
                        exclude("org.apache.spark", "spark-streaming_2.10").
                        exclude("org.apache.spark", "spark-sql_2.10").
                        exclude("javax.jms", "jms"),
                "org.apache.spark"         %% "spark-streaming"
% SPARK_VERSION   % "test" classifier "tests",
                "com.typesafe"              % "config"                % "1.2.1",
                "com.typesafe.play"        %% "play-json"             % "2.3.4"
            ) ++ hadoopDeps ++ esHadoopDeps ++ jsonDeps ++
commonTestDeps ++ commonDeps)

resolvers ++= Seq(
  Resolver.sonatypeRepo("snapshots"),
  Resolver.sonatypeRepo("public"),
  "conjars.org" at "http://conjars.org/repo";,
  "JBoss Repository" at
"http://repository.jboss.org/nexus/content/repositories/releases/";,
  "Spray Repository" at "http://repo.spray.cc/";,
  "Cloudera Repository" at
"https://repository.cloudera.com/artifactory/cloudera-repos/";,
  "Akka Repository" at "http://repo.akka.io/releases/";,
  "Twitter4J Repository" at "http://twitter4j.org/maven2/";,
  "Apache HBase" at
"https://repository.apache.org/content/repositories/releases";,
  "Twitter Maven Repo" at "http://maven.twttr.com/";,
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools";,
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/";,
  "Second Typesafe repo" at "http://repo.typesafe.com/typesafe/maven-releases/";
)

mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
  {
    case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
    case m if m.startsWith("META-INF") => MergeStrategy.discard
    case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
    case PathList("org", "apache", xs @ _*) => MergeStrategy.first
    case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
    case "about.html"  => MergeStrategy.rename
    case "reference.conf" => MergeStrategy.concat
    case _ => MergeStrategy.first
  }
}

Am I by chance missing an exclude that is bring in an older version of
spark into the Assembly; hmm need to go look at that.

I am using the SNAPSHOT build of elasticsearch-hadoop as it is built
against 1.2.1 of spark.  Per the elasticsearch-hadoop gradle.properties the
spark version set to:

sparkVersion = 1.2.1

Other than possibly missing an exclude that is bring in an older version of
Spark from some where, I do see that I am referencing the
"org.apache.hadoop" % "hadoop-client" % "2.6.0" % "provided", but I don't
think that is the issue.

Any other thoughts?

-Todd

On Wed, Mar 18, 2015 at 8:27 PM, Cheng, Hao <hao.ch...@intel.com> wrote:

>  Seems the elasticsearch-hadoop project was built with an old version of
> Spark, and then you upgraded the Spark version in execution env, as I know
> the StructField changed the definition in Spark 1.2, can you confirm the
> version problem first?
>
>
>
> *From:* Todd Nist [mailto:tsind...@gmail.com]
> *Sent:* Thursday, March 19, 2015 7:49 AM
> *To:* user@spark.apache.org
> *Subject:* [SQL] Elasticsearch-hadoop, exception creating temporary table
>
>
>
>
>
> I am attempting to access ElasticSearch and expose it’s data through
> SparkSQL using the elasticsearch-hadoop project.  I am encountering the
> following exception when trying to create a Temporary table from a resource
> in ElasticSearch.:
>
> 15/03/18 07:54:46 INFO DAGScheduler: Job 2 finished: runJob at 
> EsSparkSQL.scala:51, took 0.862184 s
>
> Create Temporary Table for querying
>
> Exception *in* thread "main" java.lang.NoSuchMethodError: 
> org.apache.spark.sql.catalyst.types.StructField.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
>
> at 
> org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
>
> at 
> org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at 
> org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
>
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
> at 
> org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
>
> at 
> org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
>
> at 
> org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema$lzycompute(DefaultSource.scala:33)
>
> at 
> org.elasticsearch.spark.sql.ElasticsearchRelation.lazySchema(DefaultSource.scala:32)
>
> at 
> org.elasticsearch.spark.sql.ElasticsearchRelation.<init>(DefaultSource.scala:36)
>
> at 
> org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:20)
>
> at org.apache.spark.sql.sources.CreateTableUsing.run(ddl.scala:103)
>
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:67)
>
> at 
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:67)
>
> at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:75)
>
> at 
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
>
> at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
>
> at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
>
> at org.apache.spark.sql.SchemaRDD.<init>(SchemaRDD.scala:108)
>
> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
>
> at 
> io.radtech.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:87)
>
> at 
> io.radtech.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>
>
>   I have loaded the “accounts.json” file from ElasticSearch into my
> ElasticSearch cluster. The mapping looks as follows:
>
> radtech:elastic-search-work tnist$ curl -XGET 
> 'http://localhost:9200/bank/_mapping'
>
> {"bank":{"mappings":{"account":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}
>
>  I can read the data just fine doing the following:
>
> import java.io.File
>
>
>
> import scala.collection.JavaConversions._
>
>
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.{SchemaRDD,SQLContext}
>
>
>
> // ES imports
>
> import org.elasticsearch.spark._
>
> import org.elasticsearch.spark.sql._
>
>
>
> import io.radtech.spark.utils.{Settings, Spark, ElasticSearch}
>
>
>
> object ElasticSearchReadWrite {
>
>
>
>   /**
>
>    * Spark specific configuration
>
>    */
>
>   def sparkInit(): SparkContext = {
>
>     val conf = new 
> SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
>
>     conf.set("es.nodes", ElasticSearch.Nodes)
>
>     conf.set("es.port", ElasticSearch.HttpPort.toString())
>
>     conf.set("es.index.auto.create", "true");
>
>     conf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer");
>
>     conf.set("spark.executor.memory","1g")
>
>     conf.set("spark.kryoserializer.buffer.mb","256")
>
>
>
>     val sparkContext = new SparkContext(conf)
>
>
>
>     sparkContext
>
>   }
>
>
>
>   def main(args: Array[String]) {
>
>
>
>     val sc = sparkInit
>
>
>
>     val sqlContext = new SQLContext(sc)
>
>     import sqlContext._
>
>
>
>     val start = System.currentTimeMillis()
>
>
>
>     /*
>
>      * Read from ES and query with with Spark & SparkSQL
>
>      */
>
>     val esData = sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}")
>
>
>
>     esData.collect.foreach(println(_))
>
>
>
>     val end = System.currentTimeMillis()
>
>     println(s"Total time: ${end-start} ms")
>
>  This works fine and and prints the content of esData out as one would
> expect.
>
> 15/03/18 07:54:42 INFO DAGScheduler: Job 0 finished: collect at 
> ElasticSearchReadWrite*.*scala:67, took 6.897443 s
>
> (4,*Map*(employer -> Tourmania, city -> Eastvale, address -> 986 Wyckoff 
> Avenue, state -> HI, balance -> 27658, age -> 31, gender -> F, lastname -> 
> Flores, email -> rodriquezflores@tourmania*.*com 
> <rodriquezflo...@tourmania.com>, firstname -> Rodriquez, account_number -> 4))
>
> (9,*Map*(employer -> Cedward, city -> Olney, address -> 963 Neptune Avenue, 
> state -> OH, balance -> 24776, age -> 39, gender -> M, lastname -> Meadows, 
> email -> opalmeadows@cedward*.*com <opalmead...@cedward.com>, firstname -> 
> Opal, account_number -> 9))
>
> ...
>
>  As does creating a new index and type like this:
>
>     println("read json in and store in ES")
>
>     // read *in* JSON *and* store *in* ES
>
>     val path = "document.json"
>
>     val rdd : SchemaRDD = sqlContext.jsonFile(path)
>
>
>
>     rdd.saveToEs("myIndex/myDoc")
>
>  However, when I attempt to access the the table via the sqlContext like
> this I get the exception shown above:
>
>     println("*Create* *Temporary* *Table* *for* querying")
>
>
>
>     val schemaRDD: SchemaRDD = sqlContext.sql(
>
>           "*CREATE* *TEMPORARY* *TABLE* account    " +
>
>           "*USING* org.elasticsearch.spark.*sql* " +
>
>           "OPTIONS (resource 'bank/account')  " )
>
>   }
>
> }
>
>  I'm using ElasticSearch 1.4.4, spark-1.2.1-bin-hadoop2.4, and the
> elasticsearch-hadoop:
>
> "org.elasticsearch" % "elasticsearch-hadoop" % "2.1.0.BUILD-SNAPSHOT"
>
>  Any insight on what I am doing wrong?
>
> TIA for the assistance.
>
>   -Todd
>

Reply via email to