Hi,

I am trying to use Spark and ElasticSearch.

Currently, the RDD contains pipe delimited records.

parsedRDD.saveAsNewAPIHadoopFile(outputLocation,
   NullWritable.class, 
   Text.class, 
   CustomTextOutputFormat.class,
   job.getConfiguration());

Write now I am storing the output in HDFS. Instead now I want to create an 
index and store the output and want to use to kibana to do some analysis. 

What do I need to change so that I can push into ElasticSearch? Is it 
ESOutputFormat?



On Monday, July 7, 2014 11:14:47 PM UTC+5:30, Costin Leau wrote:
>
> Thanks for the analysis. It looks like Hadoop 1.0.4 POM has an invalid pom 
> - though it uses Jackson 1.8.8 (see the distro) the pom declares version 
> 1.0.1 for some reason. Hadoop version 1.2 (the latest stable) and higher 
> has this fixed.
>
> We don't mark the jackson version within our POM since it's already 
> available at runtime - we can probably due so going forward in the Spark 
> integration.
>
>
> On Mon, Jul 7, 2014 at 6:39 PM, Brian Thomas <brianjt...@gmail.com 
> <javascript:>> wrote:
>
>> Here is the gradle build I was using originally:
>>
>> apply plugin: 'java'
>> apply plugin: 'eclipse'
>>
>> sourceCompatibility = 1.7
>> version = '0.0.1'
>> group = 'com.spark.testing'
>>
>> repositories {
>> mavenCentral()
>> }
>>
>> dependencies {
>> compile 'org.apache.spark:spark-core_2.10:1.0.0'
>>  compile 'edu.stanford.nlp:stanford-corenlp:3.3.1'
>> compile group: 'edu.stanford.nlp', name: 'stanford-corenlp', version: 
>> '3.3.1', classifier:'models'
>>  compile files('lib/elasticsearch-hadoop-2.0.0.jar')
>> testCompile 'junit:junit:4.+'
>> testCompile group: "com.github.tlrx", name: "elasticsearch-test", 
>> version: "1.2.1"
>> }
>>
>>
>> When I ran dependencyInsight on jackson, I got the following output:
>>
>> C:\dev\workspace\SparkProject>gradle dependencyInsight --dependency 
>> jackson-core
>>
>> :dependencyInsight
>> com.fasterxml.jackson.core:jackson-core:2.3.0
>> \--- com.fasterxml.jackson.core:jackson-databind:2.3.0
>>      +--- org.json4s:json4s-jackson_2.10:3.2.6
>>      |    \--- org.apache.spark:spark-core_2.10:1.0.0
>>      |         \--- compile
>>      \--- com.codahale.metrics:metrics-json:3.0.0
>>           \--- org.apache.spark:spark-core_2.10:1.0.0 (*)
>>
>> org.codehaus.jackson:jackson-core-asl:1.0.1
>> \--- org.codehaus.jackson:jackson-mapper-asl:1.0.1
>>      \--- org.apache.hadoop:hadoop-core:1.0.4
>>           \--- org.apache.hadoop:hadoop-client:1.0.4
>>                \--- org.apache.spark:spark-core_2.10:1.0.0
>>                     \--- compile
>>
>> Version 1.0.1 of jackson-core-asl does not have the field 
>> ALLOW_UNQUOTED_FIELD_NAMES, but later versions of it do.
>>
>> On Sunday, July 6, 2014 4:28:56 PM UTC-4, Costin Leau wrote:
>>
>>> Hi,
>>>
>>> Glad to see you sorted out the problem. Out of curiosity what version of 
>>> jackson were you using and what was pulling it in? Can you share you maven 
>>> pom/gradle build?
>>>
>>>
>>> On Sun, Jul 6, 2014 at 10:27 PM, Brian Thomas <brianjt...@gmail.com> 
>>> wrote:
>>>
>>>>  I figured it out, dependency issue in my classpath.  Maven was 
>>>> pulling down a very old version of the jackson jar.  I added the following 
>>>> line to my dependencies and the error went away:
>>>>
>>>> compile 'org.codehaus.jackson:jackson-mapper-asl:1.9.13'
>>>>
>>>>
>>>> On Friday, July 4, 2014 3:22:30 PM UTC-4, Brian Thomas wrote:
>>>>>
>>>>>  I am trying to test querying elasticsearch using Apache Spark using 
>>>>> elasticsearch-hadoop.  I am just trying to do a query to the 
>>>>> elasticsearch 
>>>>> server and return the count of results.
>>>>>
>>>>> Below is my test class using the Java API:
>>>>>
>>>>> import org.apache.hadoop.conf.Configuration;
>>>>> import org.apache.hadoop.io.MapWritable;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.spark.SparkConf;
>>>>> import org.apache.spark.api.java.JavaPairRDD;
>>>>> import org.apache.spark.api.java.JavaSparkContext;
>>>>> import org.apache.spark.serializer.KryoSerializer;
>>>>> import org.elasticsearch.hadoop.mr.EsInputFormat;
>>>>>
>>>>> import scala.Tuple2;
>>>>>
>>>>> public class ElasticsearchSparkQuery{
>>>>>
>>>>>     public static int query(String masterUrl, String 
>>>>> elasticsearchHostPort) {
>>>>>         SparkConf sparkConfig = new SparkConf().setAppName("ESQuer
>>>>> y").setMaster(masterUrl);
>>>>>         sparkConfig.set("spark.serializer", 
>>>>> KryoSerializer.class.getName());
>>>>>         JavaSparkContext sparkContext = new 
>>>>> JavaSparkContext(sparkConfig);
>>>>>
>>>>>         Configuration conf = new Configuration();
>>>>>         conf.setBoolean("mapred.map.tasks.speculative.execution", 
>>>>> false);
>>>>>         conf.setBoolean("mapred.reduce.tasks.speculative.execution", 
>>>>> false);
>>>>>         conf.set("es.nodes", elasticsearchHostPort);
>>>>>         conf.set("es.resource", "media/docs");
>>>>>         conf.set("es.query", "?q=*");
>>>>>
>>>>>         JavaPairRDD<Text, MapWritable> esRDD = 
>>>>> sparkContext.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class,
>>>>>                 MapWritable.class);
>>>>>         return (int) esRDD.count();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> When I try to run this I get the following error:
>>>>>
>>>>>
>>>>> 4/07/04 14:58:07 INFO executor.Executor: Running task ID 0
>>>>> 14/07/04 14:58:07 INFO storage.BlockManager: Found block broadcast_0 
>>>>> locally
>>>>> 14/07/04 14:58:07 INFO rdd.NewHadoopRDD: Input split: ShardInputSplit 
>>>>> [node=[5UATWUzmTUuNzhmGxXWy_w/S'byll|10.45.71.152:9200],shard=0]
>>>>> 14/07/04 14:58:07 WARN mr.EsInputFormat: Cannot determine task id...
>>>>> 14/07/04 14:58:07 ERROR executor.Executor: Exception in task ID 0
>>>>> java.lang.NoSuchFieldError: ALLOW_UNQUOTED_FIELD_NAMES
>>>>>     at org.elasticsearch.hadoop.serialization.json.JacksonJsonParse
>>>>> r.<clinit>(JacksonJsonParser.java:38)
>>>>>     at org.elasticsearch.hadoop.serialization.ScrollReader.read(
>>>>> ScrollReader.java:75)
>>>>>     at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepo
>>>>> sitory.java:267)
>>>>>     at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuer
>>>>> y.java:75)
>>>>>     at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.
>>>>> next(EsInputFormat.java:319)
>>>>>     at org.elasticsearch.hadoop.mr.EsInputFormat$ShardRecordReader.
>>>>> nextKeyValue(EsInputFormat.java:255)
>>>>>     at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopR
>>>>> DD.scala:122)
>>>>>     at org.apache.spark.InterruptibleIterator.hasNext(Interruptible
>>>>> Iterator.scala:39)
>>>>>     at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1014)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
>>>>>     at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)
>>>>>     at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkC
>>>>> ontext.scala:1080)
>>>>>     at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkC
>>>>> ontext.scala:1080)
>>>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
>>>>> la:111)
>>>>>     at org.apache.spark.scheduler.Task.run(Task.scala:51)
>>>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>>>>> scala:187)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>>> Executor.java:1145)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>>> lExecutor.java:615)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>> Has anyone run into this issue with the JacksonJsonParser?
>>>>>
>>>>>  -- 
>>>> You received this message because you are subscribed to the Google 
>>>> Groups "elasticsearch" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send 
>>>> an email to elasticsearc...@googlegroups.com.
>>>> To view this discussion on the web visit https://groups.google.com/d/
>>>> msgid/elasticsearch/9c2b2f2e-5196-4a72-bfbc-4cd0fda9edf0%
>>>> 40googlegroups.com 
>>>> <https://groups.google.com/d/msgid/elasticsearch/9c2b2f2e-5196-4a72-bfbc-4cd0fda9edf0%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>  -- 
>> You received this message because you are subscribed to the Google Groups 
>> "elasticsearch" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to elasticsearc...@googlegroups.com <javascript:>.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/elasticsearch/ecca33ea-b1e0-4196-84f0-c3c0838de786%40googlegroups.com
>>  
>> <https://groups.google.com/d/msgid/elasticsearch/ecca33ea-b1e0-4196-84f0-c3c0838de786%40googlegroups.com?utm_medium=email&utm_source=footer>
>> .
>>
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>

-- 
You received this message because you are subscribed to the Google Groups 
"elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to elasticsearch+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/elasticsearch/39d9fc65-7816-4904-8994-0b1ada7ce723%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to