Thanks, more local thread solve the problem, it's work like a charm. How many thread required? Adrian: it's not public project but ask, and I will answer (if I can)... maybe later I will create a demo project based on my solution.
b0c1 ---------------------------------------------------------------------------------------------------------------------------------- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <hol...@pigscanfly.ca> wrote: > Try setting the master to local[4] > > > On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com> wrote: > >> This is a simply scalatest. I start a SparkConf, set the master to local >> (set the serializer etc), pull up kafka and es connection send a message to >> kafka and wait 30sec to processing. >> >> It's run in IDEA no magick trick. >> >> b0c1 >> >> >> ---------------------------------------------------------------------------------------------------------------------------------- >> Skype: boci13, Hangout: boci.b...@gmail.com >> >> >> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> So a few quick questions: >>> >>> 1) What cluster are you running this against? Is it just local? Have you >>> tried local[4]? >>> 2) When you say breakpoint, how are you setting this break point? There >>> is a good chance your breakpoint mechanism doesn't work in a distributed >>> environment, could you instead cause a side effect (like writing to a file)? >>> >>> Cheers, >>> >>> Holden :) >>> >>> >>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote: >>> >>>> Ok I found dynamic resources, but I have a frustrating problem. This is >>>> the flow: >>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save >>>> >>>> My problem is: if I do this it's not work, the enrich functions not >>>> called, but if I put a print it's does. for example if I do this: >>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD >>>> >>>> The enrich X and enrich Y called but enrich Z not >>>> if I put the print after the enrich Z it's will be printed. How can I >>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside >>>> the map function (where I'm generate the writable) but it's not called) >>>> >>>> Any idea? >>>> >>>> b0c1 >>>> >>>> >>>> >>>> >>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>> >>>> >>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote: >>>> >>>>> Another question. In the foreachRDD I will initialize the JobConf, but >>>>> in this place how can I get information from the items? >>>>> I have an identifier in the data which identify the required ES index >>>>> (so how can I set dynamic index in the foreachRDD) ? >>>>> >>>>> b0c1 >>>>> >>>>> >>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>> >>>>> >>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> Just your luck I happened to be working on that very talk today :) >>>>>> Let me know how your experiences with Elasticsearch & Spark go :) >>>>>> >>>>>> >>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote: >>>>>> >>>>>>> Wow, thanks your fast answer, it's help a lot... >>>>>>> >>>>>>> b0c1 >>>>>>> >>>>>>> >>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca >>>>>>> > wrote: >>>>>>> >>>>>>>> Hi b0c1, >>>>>>>> >>>>>>>> I have an example of how to do this in the repo for my talk as >>>>>>>> well, the specific example is at >>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala >>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD >>>>>>>> and >>>>>>>> then call saveAsHadoopDataset on the RDD that gets passed into the >>>>>>>> function we provide to foreachRDD. >>>>>>>> >>>>>>>> e.g. >>>>>>>> >>>>>>>> stream.foreachRDD{(data, time) => >>>>>>>> val jobconf = ... >>>>>>>> data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) >>>>>>>> } >>>>>>>> >>>>>>>> Hope that helps :) >>>>>>>> >>>>>>>> Cheers, >>>>>>>> >>>>>>>> Holden :) >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks. I without local option I can connect with es remote, now I >>>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark >>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my >>>>>>>>> second problem the output index is depend by the input data. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> >>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath < >>>>>>>>> nick.pentre...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your >>>>>>>>>> project to user the ESInputFormat and ESOutputFormat ( >>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some >>>>>>>>>> other basics here: >>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html >>>>>>>>>> >>>>>>>>>> For testing, yes I think you will need to start ES in local mode >>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = >>>>>>>>>> localhost, >>>>>>>>>> port = 9200). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I >>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up >>>>>>>>>>> hadoop >>>>>>>>>>> programatically? (if I can)) >>>>>>>>>>> >>>>>>>>>>> b0c1 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau < >>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question: >>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to >>>>>>>>>>>>> create es connection, but in prodution I want to use >>>>>>>>>>>>> ElasticClient.remote, >>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or >>>>>>>>>>>>> what is the best practices? >>>>>>>>>>>>> >>>>>>>>>>>> In this case you probably want to make the ElasticClient inside >>>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to >>>>>>>>>>>> use a >>>>>>>>>>>> different client in local mode just have a flag that control what >>>>>>>>>>>> type of >>>>>>>>>>>> client you create. >>>>>>>>>>>> >>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I >>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local >>>>>>>>>>>>> environment? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> - After store the enriched data into ES, I want to generate >>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local? >>>>>>>>>>>>> >>>>>>>>>>>> I think the simplest thing to do would be use the same client >>>>>>>>>>>> in mode and just start single node elastic search cluster. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks guys >>>>>>>>>>>>> >>>>>>>>>>>>> b0c1 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> ---------------------------------------------------------------------------------------------------------------------------------- >>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau < >>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark & >>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo >>>>>>>>>>>>>> which uses >>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & >>>>>>>>>>>>>> dirty >>>>>>>>>>>>>> implementation with TopTweetsInALocation ( >>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala >>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the >>>>>>>>>>>>>> difficulty of >>>>>>>>>>>>>> having to manually create ElasticSearch clients. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need >>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the >>>>>>>>>>>>>> case, you >>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your >>>>>>>>>>>>>> Elasticsearch >>>>>>>>>>>>>> connection inside of that, so you could then re-use the client >>>>>>>>>>>>>> for all of >>>>>>>>>>>>>> the queries on each partition. This approach will avoid having >>>>>>>>>>>>>> to serialize >>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your >>>>>>>>>>>>>> function. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hope this helps! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Holden :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi < >>>>>>>>>>>>>> mayur.rust...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Its not used as default serializer for some issues with >>>>>>>>>>>>>>> compatibility & requirement to register the classes.. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to >>>>>>>>>>>>>>> serialize that class if you are sending it to spark workers >>>>>>>>>>>>>>> inside a map, >>>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Mayur Rustagi >>>>>>>>>>>>>>> Ph: +1 (760) 203 3257 >>>>>>>>>>>>>>> http://www.sigmoidanalytics.com >>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng < >>>>>>>>>>>>>>> pc...@uow.edu.au> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a >>>>>>>>>>>>>>>> dangerous act as they >>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. >>>>>>>>>>>>>>>> Your ES server may >>>>>>>>>>>>>>>> think its a man-in-the-middle attack! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think its possible to invoke a static method that give >>>>>>>>>>>>>>>> you a connection in >>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, >>>>>>>>>>>>>>>> but its too complex >>>>>>>>>>>>>>>> and there should be a better option. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should >>>>>>>>>>>>>>>> use it as the >>>>>>>>>>>>>>>> default serializer >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> View this message in context: >>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html >>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive >>>>>>>>>>>>>>>> at Nabble.com. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Cell : 425-233-8271 >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Cell : 425-233-8271 >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Cell : 425-233-8271 >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> Cell : 425-233-8271 >>> >> >> > > > -- > Cell : 425-233-8271 >