So a few quick questions: 1) What cluster are you running this against? Is it just local? Have you tried local[4]? 2) When you say breakpoint, how are you setting this break point? There is a good chance your breakpoint mechanism doesn't work in a distributed environment, could you instead cause a side effect (like writing to a file)?
Cheers, Holden :) On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote: > Ok I found dynamic resources, but I have a frustrating problem. This is > the flow: > kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save > > My problem is: if I do this it's not work, the enrich functions not > called, but if I put a print it's does. for example if I do this: > kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD > > The enrich X and enrich Y called but enrich Z not > if I put the print after the enrich Z it's will be printed. How can I > solve this? (what can I do to call the foreachRDD I put breakpoint inside > the map function (where I'm generate the writable) but it's not called) > > Any idea? > > b0c1 > > > > > ---------------------------------------------------------------------------------------------------------------------------------- > Skype: boci13, Hangout: boci.b...@gmail.com > > > On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote: > >> Another question. In the foreachRDD I will initialize the JobConf, but in >> this place how can I get information from the items? >> I have an identifier in the data which identify the required ES index (so >> how can I set dynamic index in the foreachRDD) ? >> >> b0c1 >> >> >> ---------------------------------------------------------------------------------------------------------------------------------- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> >> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> Just your luck I happened to be working on that very talk today :) Let >>> me know how your experiences with Elasticsearch & Spark go :) >>> >>> >>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: >>> >>>> Wow, thanks your fast answer, it's help a lot... >>>> >>>> b0c1 >>>> >>>> >>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>> >>>> >>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> Hi b0c1, >>>>> >>>>> I have an example of how to do this in the repo for my talk as well, >>>>> the specific example is at >>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and >>>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>>> function we provide to foreachRDD. >>>>> >>>>> e.g. >>>>> >>>>> stream.foreachRDD{(data, time) => >>>>> val jobconf = ... >>>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>>> } >>>>> >>>>> Hope that helps :) >>>>> >>>>> Cheers, >>>>> >>>>> Holden :) >>>>> >>>>> >>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>>>> >>>>>> Thanks. I without local option I can connect with es remote, now I >>>>>> only have one problem. How can I use elasticsearch-hadoop with spark >>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>>> second problem the output index is depend by the input data. >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>>> nick.pentre...@gmail.com> wrote: >>>>>> >>>>>>> You can just add elasticsearch-hadoop as a dependency to your >>>>>>> project to user the ESInputFormat and ESOutputFormat ( >>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other >>>>>>> basics here: >>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>>> >>>>>>> For testing, yes I think you will need to start ES in local mode >>>>>>> (just ./bin/elasticsearch) and use the default config (host = localhost, >>>>>>> port = 9200). >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote: >>>>>>> >>>>>>>> That's okay, but hadoop has ES integration. what happened if I run >>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop >>>>>>>> programatically? (if I can)) >>>>>>>> >>>>>>>> b0c1 >>>>>>>> >>>>>>>> >>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <hol...@pigscanfly.ca >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to >>>>>>>>>> create es connection, but in prodution I want to use >>>>>>>>>> ElasticClient.remote, >>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what >>>>>>>>>> is the best practices? >>>>>>>>>> >>>>>>>>> In this case you probably want to make the ElasticClient inside of >>>>>>>>> mapPartitions (since it isn't serializable) and if you want to use a >>>>>>>>> different client in local mode just have a flag that control what >>>>>>>>> type of >>>>>>>>> client you create. >>>>>>>>> >>>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>>> environment? >>>>>>>>>> >>>>>>>>>> >>>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>>> >>>>>>>>> I think the simplest thing to do would be use the same client in >>>>>>>>> mode and just start single node elastic search cluster. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks guys >>>>>>>>>> >>>>>>>>>> b0c1 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>> >>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which >>>>>>>>>>> uses >>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty >>>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the difficulty >>>>>>>>>>> of >>>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>>> >>>>>>>>>>> This approach might not work for your data, e.g. if you need to >>>>>>>>>>> create a query for each record in your RDD. If this is the case, >>>>>>>>>>> you could >>>>>>>>>>> instead look at using mapPartitions and setting up your >>>>>>>>>>> Elasticsearch >>>>>>>>>>> connection inside of that, so you could then re-use the client for >>>>>>>>>>> all of >>>>>>>>>>> the queries on each partition. This approach will avoid having to >>>>>>>>>>> serialize >>>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>>> function. >>>>>>>>>>> >>>>>>>>>>> Hope this helps! >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> >>>>>>>>>>> Holden :) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>>> >>>>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>>>> serialize that class if you are sending it to spark workers inside >>>>>>>>>>>> a map, >>>>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Mayur Rustagi >>>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>>> dangerous act as they >>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your >>>>>>>>>>>>> ES server may >>>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>>> >>>>>>>>>>>>> I think its possible to invoke a static method that give you a >>>>>>>>>>>>> connection in >>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but >>>>>>>>>>>>> its too complex >>>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>>> >>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use >>>>>>>>>>>>> it as the >>>>>>>>>>>>> default serializer >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>>>>> Nabble.com. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Cell : 425-233-8271 >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Cell : 425-233-8271 >>>>> >>>> >>>> >>> >>> >>> -- >>> Cell : 425-233-8271 >>> >> >> > -- Cell : 425-233-8271