Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ?
b0c1 ---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> wrote: > Just your luck I happened to be working on that very talk today :) Let me > know how your experiences with Elasticsearch & Spark go :) > > > On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: > >> Wow, thanks your fast answer, it's help a lot... >> >> b0c1 >> >> >> ---------------------------------------------------------------------------------------------------------------------------------- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> >> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> Hi b0c1, >>> >>> I have an example of how to do this in the repo for my talk as well, the >>> specific example is at >>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >>> then call saveAsHadoopDataset on the RDD that gets passed into the >>> function we provide to foreachRDD. >>> >>> e.g. >>> >>> stream.foreachRDD{(data, time) => >>> val jobconf = ... >>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>> } >>> >>> Hope that helps :) >>> >>> Cheers, >>> >>> Holden :) >>> >>> >>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>> >>>> Thanks. I without local option I can connect with es remote, now I only >>>> have one problem. How can I use elasticsearch-hadoop with spark streaming? >>>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem >>>> the output index is depend by the input data. >>>> >>>> Thanks >>>> >>>> >>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>> >>>> >>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> You can just add elasticsearch-hadoop as a dependency to your project >>>>> to user the ESInputFormat and ESOutputFormat ( >>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>>>> basics here: >>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>> >>>>> For testing, yes I think you will need to start ES in local mode (just >>>>> ./bin/elasticsearch) and use the default config (host = localhost, port = >>>>> 9200). >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>>> >>>>>> That's okay, but hadoop has ES integration. what happened if I run >>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>>> programatically? (if I can)) >>>>>> >>>>>> b0c1 >>>>>> >>>>>> >>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>> - in local (test) mode I want to use ElasticClient.local to create >>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to >>>>>>>> this >>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best >>>>>>>> practices? >>>>>>>> >>>>>>> In this case you probably want to make the ElasticClient inside of >>>>>>> mapPartitions (since it isn't serializable) and if you want to use a >>>>>>> different client in local mode just have a flag that control what type >>>>>>> of >>>>>>> client you create. >>>>>>> >>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? >>>>>>>> >>>>>>>> >>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>> >>>>>>> I think the simplest thing to do would be use the same client in >>>>>>> mode and just start single node elastic search cluster. >>>>>>> >>>>>>>> >>>>>>>> Thanks guys >>>>>>>> >>>>>>>> b0c1 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses >>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of >>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>> >>>>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>>>> create a query for each record in your RDD. If this is the case, you >>>>>>>>> could >>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch >>>>>>>>> connection inside of that, so you could then re-use the client for >>>>>>>>> all of >>>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>>> serialize >>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>> function. >>>>>>>>> >>>>>>>>> Hope this helps! >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> >>>>>>>>> Holden :) >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>> >>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>> serialize that class if you are sending it to spark workers inside a >>>>>>>>>> map, >>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Mayur Rustagi >>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous >>>>>>>>>>> act as they >>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES >>>>>>>>>>> server may >>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>> >>>>>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>>>>> connection in >>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its >>>>>>>>>>> too complex >>>>>>>>>>> and there should be a better option. >>>>>>>>>>> >>>>>>>>>>> Never use kryo before, if its that good perhaps we should use it >>>>>>>>>>> as the >>>>>>>>>>> default serializer >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> View this message in context: >>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>> Nabble.com. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Cell : 425-233-8271 >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cell : 425-233-8271 >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >>> >>> -- >>> Cell : 425-233-8271 >>> >> >> > > > -- > Cell : 425-233-8271 >