Ok I found dynamic resources, but I have a frustrating problem. This is the
flow:
kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save

My problem is: if I do this it's not work, the enrich functions not called,
but if I put a print it's does. for example if I do this:
kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve
this? (what can I do to call the foreachRDD I put breakpoint inside the map
function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote:

> Another question. In the foreachRDD I will initialize the JobConf, but in
> this place how can I get information from the items?
> I have an identifier in the data which identify the required ES index (so
> how can I set dynamic index in the foreachRDD) ?
>
> b0c1
>
>
> ----------------------------------------------------------------------------------------------------------------------------------
> Skype: boci13, Hangout: boci.b...@gmail.com
>
>
> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
>> Just your luck I happened to be working on that very talk today :) Let me
>> know how your experiences with Elasticsearch & Spark go :)
>>
>>
>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote:
>>
>>> Wow, thanks your fast answer, it's help a lot...
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> Hi b0c1,
>>>>
>>>> I have an example of how to do this in the repo for my talk as well,
>>>> the specific example is at
>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>> function we provide to foreachRDD.
>>>>
>>>> e.g.
>>>>
>>>> stream.foreachRDD{(data, time) =>
>>>>      val jobconf = ...
>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>> }
>>>>
>>>> Hope that helps :)
>>>>
>>>> Cheers,
>>>>
>>>> Holden :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:
>>>>
>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>> second problem the output index is depend by the input data.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>> nick.pentre...@gmail.com> wrote:
>>>>>
>>>>>> You can just add elasticsearch-hadoop as a dependency to your project
>>>>>> to user the ESInputFormat and ESOutputFormat (
>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>> basics here:
>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>
>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost,
>>>>>> port = 9200).
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote:
>>>>>>
>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>> programatically? (if I can))
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to create
>>>>>>>>> es connection, but in prodution I want to use ElasticClient.remote, 
>>>>>>>>> to this
>>>>>>>>> I want to pass ElasticClient to mapPartitions, or what is the
>>>>>>>>> best practices?
>>>>>>>>>
>>>>>>>> In this case you probably want to make the ElasticClient inside of
>>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a
>>>>>>>> different client in local mode just have a flag that control what type 
>>>>>>>> of
>>>>>>>> client you create.
>>>>>>>>
>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local 
>>>>>>>>> environment?
>>>>>>>>>
>>>>>>>>>
>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>
>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks guys
>>>>>>>>>
>>>>>>>>> b0c1
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which 
>>>>>>>>>> uses
>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty 
>>>>>>>>>> of
>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>
>>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>>> create a query for each record in your RDD. If this is the case, you 
>>>>>>>>>> could
>>>>>>>>>> instead look at using mapPartitions and setting up your Elasticsearch
>>>>>>>>>> connection inside of that, so you could then re-use the client for 
>>>>>>>>>> all of
>>>>>>>>>> the queries on each partition. This approach will avoid having to 
>>>>>>>>>> serialize
>>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>>> function.
>>>>>>>>>>
>>>>>>>>>> Hope this helps!
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>>
>>>>>>>>>> Holden :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>
>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>> serialize that class if you are sending it to spark workers inside 
>>>>>>>>>>> a map,
>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your ES
>>>>>>>>>>>> server may
>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>
>>>>>>>>>>>> I think its possible to invoke a static method that give you a
>>>>>>>>>>>> connection in
>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>> its too complex
>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>
>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use
>>>>>>>>>>>> it as the
>>>>>>>>>>>> default serializer
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>>
>> --
>> Cell : 425-233-8271
>>
>
>

Reply via email to