Another question. In the foreachRDD I will initialize the JobConf, but in
this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so
how can I set dynamic index in the foreachRDD) ?

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Just your luck I happened to be working on that very talk today :) Let me
> know how your experiences with Elasticsearch & Spark go :)
>
>
> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote:
>
>> Wow, thanks your fast answer, it's help a lot...
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> Hi b0c1,
>>>
>>> I have an example of how to do this in the repo for my talk as well, the
>>> specific example is at
>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>> function we provide to foreachRDD.
>>>
>>> e.g.
>>>
>>> stream.foreachRDD{(data, time) =>
>>>      val jobconf = ...
>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>> }
>>>
>>> Hope that helps :)
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:
>>>
>>>> Thanks. I without local option I can connect with es remote, now I only
>>>> have one problem. How can I use elasticsearch-hadoop with spark streaming?
>>>> I mean DStream doesn't have "saveAsHadoopFiles" method, my second problem
>>>> the output index is depend by the input data.
>>>>
>>>> Thanks
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>> basics here:
>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>
>>>>> For testing, yes I think you will need to start ES in local mode (just
>>>>> ./bin/elasticsearch) and use the default config (host = localhost, port =
>>>>> 9200).
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote:
>>>>>
>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>> programatically? (if I can))
>>>>>>
>>>>>> b0c1
>>>>>>
>>>>>>
>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, to 
>>>>>>>> this
>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the best
>>>>>>>> practices?
>>>>>>>>
>>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>> different client in local mode just have a flag that control what type 
>>>>>>> of
>>>>>>> client you create.
>>>>>>>
>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local environment?
>>>>>>>>
>>>>>>>>
>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>
>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>
>>>>>>>>
>>>>>>>> Thanks guys
>>>>>>>>
>>>>>>>> b0c1
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <hol...@pigscanfly.ca
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which uses
>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty of
>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>
>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>> create a query for each record in your RDD. If this is the case, you 
>>>>>>>>> could
>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>> connection inside of that, so you could then re-use the client for 
>>>>>>>>> all of
>>>>>>>>> the queries on each partition. This approach will avoid having to 
>>>>>>>>> serialize
>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>> function.
>>>>>>>>>
>>>>>>>>> Hope this helps!
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Holden :)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>
>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>> serialize that class if you are sending it to spark workers inside a 
>>>>>>>>>> map,
>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Mayur Rustagi
>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm afraid persisting connection across two tasks is a dangerous
>>>>>>>>>>> act as they
>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>>>> server may
>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>
>>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>>> connection in
>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but its
>>>>>>>>>>> too complex
>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>
>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use it
>>>>>>>>>>> as the
>>>>>>>>>>> default serializer
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Cell : 425-233-8271
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Reply via email to