Thanks, more local thread solve the problem, it's work like a charm. How
many thread required?
Adrian: it's not public project but ask, and I will answer (if I can)...
maybe later I will create a demo project based on my solution.

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:31 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Try setting the master to local[4]
>
>
> On Fri, Jun 27, 2014 at 2:17 PM, boci <boci.b...@gmail.com> wrote:
>
>> This is a simply scalatest. I start a SparkConf, set the master to local
>> (set the serializer etc), pull up kafka and es connection send a message to
>> kafka and wait 30sec to processing.
>>
>> It's run in IDEA no magick trick.
>>
>> b0c1
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> So a few quick questions:
>>>
>>> 1) What cluster are you running this against? Is it just local? Have you
>>> tried local[4]?
>>> 2) When you say breakpoint, how are you setting this break point? There
>>> is a good chance your breakpoint mechanism doesn't work in a distributed
>>> environment, could you instead cause a side effect (like writing to a file)?
>>>
>>> Cheers,
>>>
>>> Holden :)
>>>
>>>
>>> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote:
>>>
>>>> Ok I found dynamic resources, but I have a frustrating problem. This is
>>>> the flow:
>>>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>>>
>>>> My problem is: if I do this it's not work, the enrich functions not
>>>> called, but if I put a print it's does. for example if I do this:
>>>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>>>
>>>> The enrich X and enrich Y called but enrich Z not
>>>> if I put the print after the enrich Z it's will be printed. How can I
>>>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>>>> the map function (where I'm generate the writable) but it's not called)
>>>>
>>>> Any idea?
>>>>
>>>> b0c1
>>>>
>>>>
>>>>
>>>>
>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>
>>>>
>>>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote:
>>>>
>>>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>>>> in this place how can I get information from the items?
>>>>> I have an identifier in the data which identify the required ES index
>>>>> (so how can I set dynamic index in the foreachRDD) ?
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> Just your luck I happened to be working on that very talk today :)
>>>>>> Let me know how your experiences with Elasticsearch & Spark go :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>
>>>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>>>
>>>>>>> b0c1
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi b0c1,
>>>>>>>>
>>>>>>>> I have an example of how to do this in the repo for my talk as
>>>>>>>> well, the specific example is at
>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD 
>>>>>>>> and
>>>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>>>> function we provide to foreachRDD.
>>>>>>>>
>>>>>>>> e.g.
>>>>>>>>
>>>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>>>      val jobconf = ...
>>>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>>>> }
>>>>>>>>
>>>>>>>> Hope that helps :)
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>>
>>>>>>>> Holden :)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>>>> second problem the output index is depend by the input data.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some
>>>>>>>>>> other basics here:
>>>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>>>
>>>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = 
>>>>>>>>>> localhost,
>>>>>>>>>> port = 9200).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I
>>>>>>>>>>> run saveAsHadoopFile without hadoop (or I must need to pull up 
>>>>>>>>>>> hadoop
>>>>>>>>>>> programatically? (if I can))
>>>>>>>>>>>
>>>>>>>>>>> b0c1
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>>>> create es connection, but in prodution I want to use 
>>>>>>>>>>>>> ElasticClient.remote,
>>>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or
>>>>>>>>>>>>> what is the best practices?
>>>>>>>>>>>>>
>>>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to 
>>>>>>>>>>>> use a
>>>>>>>>>>>> different client in local mode just have a flag that control what 
>>>>>>>>>>>> type of
>>>>>>>>>>>> client you create.
>>>>>>>>>>>>
>>>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local 
>>>>>>>>>>>>> environment?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>>>
>>>>>>>>>>>> I think the simplest thing to do would be use the same client
>>>>>>>>>>>> in mode and just start single node elastic search cluster.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks guys
>>>>>>>>>>>>>
>>>>>>>>>>>>> b0c1
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo 
>>>>>>>>>>>>>> which uses
>>>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & 
>>>>>>>>>>>>>> dirty
>>>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the 
>>>>>>>>>>>>>> difficulty of
>>>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This approach might not work for your data, e.g. if you need
>>>>>>>>>>>>>> to create a query for each record in your RDD. If this is the 
>>>>>>>>>>>>>> case, you
>>>>>>>>>>>>>> could instead look at using mapPartitions and setting up your 
>>>>>>>>>>>>>> Elasticsearch
>>>>>>>>>>>>>> connection inside of that, so you could then re-use the client 
>>>>>>>>>>>>>> for all of
>>>>>>>>>>>>>> the queries on each partition. This approach will avoid having 
>>>>>>>>>>>>>> to serialize
>>>>>>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>>>>>>> function.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>>>> serialize that class if you are sending it to spark workers 
>>>>>>>>>>>>>>> inside a map,
>>>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <
>>>>>>>>>>>>>>> pc...@uow.edu.au> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine.
>>>>>>>>>>>>>>>> Your ES server may
>>>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I think its possible to invoke a static method that give
>>>>>>>>>>>>>>>> you a connection in
>>>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure,
>>>>>>>>>>>>>>>> but its too complex
>>>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should
>>>>>>>>>>>>>>>> use it as the
>>>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive
>>>>>>>>>>>>>>>> at Nabble.com.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Cell : 425-233-8271
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271
>

Reply via email to