Wow, thanks your fast answer, it's help a lot... b0c1
---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > Hi b0c1, > > I have an example of how to do this in the repo for my talk as well, the > specific example is at > https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala > . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and > then call saveAsHadoopDataset on the RDD that gets passed into the > function we provide to foreachRDD. > > e.g. > > stream.foreachRDD{(data, time) => > val jobconf = ... > data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) > } > > Hope that helps :) > > Cheers, > > Holden :) > > > On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: > >> Thanks. I without local option I can connect with es remote, now I only >> have one problem. How can I use elasticsearch-hadoop with spark streaming? >> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem >> the output index is depend by the input data. >> >> Thanks >> >> >> ---------------------------------------------------------------------------------------------------------------------------------- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> >> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >> nick.pentre...@gmail.com> wrote: >> >>> You can just add elasticsearch-hadoop as a dependency to your project to >>> user the ESInputFormat and ESOutputFormat ( >>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>> basics here: >>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>> >>> For testing, yes I think you will need to start ES in local mode (just >>> ./bin/elasticsearch) and use the default config (host = localhost, port = >>> 9200). >>> >>> >>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>> >>>> That's okay, but hadoop has ES integration. what happened if I run >>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>> programatically? (if I can)) >>>> >>>> b0c1 >>>> >>>> >>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>> >>>> >>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>>> >>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>> - in local (test) mode I want to use ElasticClient.local to create es >>>>>> connection, but in prodution I want to use ElasticClient.remote, to this >>>>>> I >>>>>> want to pass ElasticClient to mapPartitions, or what is the best >>>>>> practices? >>>>>> >>>>> In this case you probably want to make the ElasticClient inside of >>>>> mapPartitions (since it isn't serializable) and if you want to use a >>>>> different client in local mode just have a flag that control what type of >>>>> client you create. >>>>> >>>>>> - my stream output is write into elasticsearch. How can I >>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment? >>>>>> >>>>>> >>>>> - After store the enriched data into ES, I want to generate aggregated >>>>>> data (EsInputFormat) how can I test it in local? >>>>>> >>>>> I think the simplest thing to do would be use the same client in mode >>>>> and just start single node elastic search cluster. >>>>> >>>>>> >>>>>> Thanks guys >>>>>> >>>>>> b0c1 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>> >>>>>> >>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses >>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>> implementation with TopTweetsInALocation ( >>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of >>>>>>> having to manually create ElasticSearch clients. >>>>>>> >>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>> create a query for each record in your RDD. If this is the case, you >>>>>>> could >>>>>>> instead look at using mapPartitions and setting up your Elasticsearch >>>>>>> connection inside of that, so you could then re-use the client for all >>>>>>> of >>>>>>> the queries on each partition. This approach will avoid having to >>>>>>> serialize >>>>>>> the Elasticsearch connection because it will be local to your function. >>>>>>> >>>>>>> Hope this helps! >>>>>>> >>>>>>> Cheers, >>>>>>> >>>>>>> Holden :) >>>>>>> >>>>>>> >>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>> >>>>>>>> Its not used as default serializer for some issues with >>>>>>>> compatibility & requirement to register the classes.. >>>>>>>> >>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>> serialize that class if you are sending it to spark workers inside a >>>>>>>> map, >>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>> >>>>>>>> >>>>>>>> Mayur Rustagi >>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>> http://www.sigmoidanalytics.com >>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous >>>>>>>>> act as they >>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES >>>>>>>>> server may >>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>> >>>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>>> connection in >>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its >>>>>>>>> too complex >>>>>>>>> and there should be a better option. >>>>>>>>> >>>>>>>>> Never use kryo before, if its that good perhaps we should use it >>>>>>>>> as the >>>>>>>>> default serializer >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> View this message in context: >>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>> Nabble.com. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Cell : 425-233-8271 >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Cell : 425-233-8271 >>>>> >>>> >>>> >>> >> > > > -- > Cell : 425-233-8271 >