This is a simply scalatest. I start a SparkConf, set the master to local
(set the serializer etc), pull up kafka and es connection send a message to
kafka and wait 30sec to processing.

It's run in IDEA no magick trick.

b0c1

----------------------------------------------------------------------------------------------------------------------------------
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> So a few quick questions:
>
> 1) What cluster are you running this against? Is it just local? Have you
> tried local[4]?
> 2) When you say breakpoint, how are you setting this break point? There is
> a good chance your breakpoint mechanism doesn't work in a distributed
> environment, could you instead cause a side effect (like writing to a file)?
>
> Cheers,
>
> Holden :)
>
>
> On Fri, Jun 27, 2014 at 2:04 PM, boci <boci.b...@gmail.com> wrote:
>
>> Ok I found dynamic resources, but I have a frustrating problem. This is
>> the flow:
>> kafka -> enrich X -> enrich Y -> enrich Z -> foreachRDD -> save
>>
>> My problem is: if I do this it's not work, the enrich functions not
>> called, but if I put a print it's does. for example if I do this:
>> kafka -> enrich X -> enrich Y -> print -> enrich Z -> foreachRDD
>>
>> The enrich X and enrich Y called but enrich Z not
>> if I put the print after the enrich Z it's will be printed. How can I
>> solve this? (what can I do to call the foreachRDD I put breakpoint inside
>> the map function (where I'm generate the writable) but it's not called)
>>
>> Any idea?
>>
>> b0c1
>>
>>
>>
>>
>> ----------------------------------------------------------------------------------------------------------------------------------
>> Skype: boci13, Hangout: boci.b...@gmail.com
>>
>>
>> On Fri, Jun 27, 2014 at 4:53 PM, boci <boci.b...@gmail.com> wrote:
>>
>>> Another question. In the foreachRDD I will initialize the JobConf, but
>>> in this place how can I get information from the items?
>>> I have an identifier in the data which identify the required ES index
>>> (so how can I set dynamic index in the foreachRDD) ?
>>>
>>> b0c1
>>>
>>>
>>> ----------------------------------------------------------------------------------------------------------------------------------
>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>
>>>
>>> On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> Just your luck I happened to be working on that very talk today :) Let
>>>> me know how your experiences with Elasticsearch & Spark go :)
>>>>
>>>>
>>>> On Thu, Jun 26, 2014 at 3:17 PM, boci <boci.b...@gmail.com> wrote:
>>>>
>>>>> Wow, thanks your fast answer, it's help a lot...
>>>>>
>>>>> b0c1
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>
>>>>>
>>>>> On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> Hi b0c1,
>>>>>>
>>>>>> I have an example of how to do this in the repo for my talk as well,
>>>>>> the specific example is at
>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
>>>>>> . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
>>>>>> then call  saveAsHadoopDataset on the RDD that gets passed into the
>>>>>> function we provide to foreachRDD.
>>>>>>
>>>>>> e.g.
>>>>>>
>>>>>> stream.foreachRDD{(data, time) =>
>>>>>>      val jobconf = ...
>>>>>>      data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
>>>>>> }
>>>>>>
>>>>>> Hope that helps :)
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Holden :)
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 26, 2014 at 2:23 PM, boci <boci.b...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks. I without local option I can connect with es remote, now I
>>>>>>> only have one problem. How can I use elasticsearch-hadoop with spark
>>>>>>> streaming? I mean DStream doesn't have "saveAsHadoopFiles" method, my
>>>>>>> second problem the output index is depend by the input data.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath <
>>>>>>> nick.pentre...@gmail.com> wrote:
>>>>>>>
>>>>>>>> You can just add elasticsearch-hadoop as a dependency to your
>>>>>>>> project to user the ESInputFormat and ESOutputFormat (
>>>>>>>> https://github.com/elasticsearch/elasticsearch-hadoop). Some other
>>>>>>>> basics here:
>>>>>>>> http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html
>>>>>>>>
>>>>>>>> For testing, yes I think you will need to start ES in local mode
>>>>>>>> (just ./bin/elasticsearch) and use the default config (host = 
>>>>>>>> localhost,
>>>>>>>> port = 9200).
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 26, 2014 at 9:04 AM, boci <boci.b...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> That's okay, but hadoop has ES integration. what happened if I run
>>>>>>>>> saveAsHadoopFile without hadoop (or I must need to pull up hadoop
>>>>>>>>> programatically? (if I can))
>>>>>>>>>
>>>>>>>>> b0c1
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau <
>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 25, 2014 at 4:16 PM, boci <boci.b...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi guys, thanks the direction now I have some problem/question:
>>>>>>>>>>> - in local (test) mode I want to use ElasticClient.local to
>>>>>>>>>>> create es connection, but in prodution I want to use 
>>>>>>>>>>> ElasticClient.remote,
>>>>>>>>>>> to this I want to pass ElasticClient to mapPartitions, or what
>>>>>>>>>>> is the best practices?
>>>>>>>>>>>
>>>>>>>>>> In this case you probably want to make the ElasticClient inside
>>>>>>>>>> of mapPartitions (since it isn't serializable) and if you want to 
>>>>>>>>>> use a
>>>>>>>>>> different client in local mode just have a flag that control what 
>>>>>>>>>> type of
>>>>>>>>>> client you create.
>>>>>>>>>>
>>>>>>>>>>> - my stream output is write into elasticsearch. How can I
>>>>>>>>>>> test output.saveAsHadoopFile[ESOutputFormat]("-") in local 
>>>>>>>>>>> environment?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> - After store the enriched data into ES, I want to generate
>>>>>>>>>>> aggregated data (EsInputFormat) how can I test it in local?
>>>>>>>>>>>
>>>>>>>>>> I think the simplest thing to do would be use the same client in
>>>>>>>>>> mode and just start single node elastic search cluster.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks guys
>>>>>>>>>>>
>>>>>>>>>>> b0c1
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ----------------------------------------------------------------------------------------------------------------------------------
>>>>>>>>>>> Skype: boci13, Hangout: boci.b...@gmail.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau <
>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So I'm giving a talk at the Spark summit on using Spark &
>>>>>>>>>>>> ElasticSearch, but for now if you want to see a simple demo which 
>>>>>>>>>>>> uses
>>>>>>>>>>>> elasticsearch for geo input you can take a look at my quick & dirty
>>>>>>>>>>>> implementation with TopTweetsInALocation (
>>>>>>>>>>>> https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
>>>>>>>>>>>> ). This approach uses the ESInputFormat which avoids the 
>>>>>>>>>>>> difficulty of
>>>>>>>>>>>> having to manually create ElasticSearch clients.
>>>>>>>>>>>>
>>>>>>>>>>>> This approach might not work for your data, e.g. if you need to
>>>>>>>>>>>> create a query for each record in your RDD. If this is the case, 
>>>>>>>>>>>> you could
>>>>>>>>>>>> instead look at using mapPartitions and setting up your 
>>>>>>>>>>>> Elasticsearch
>>>>>>>>>>>> connection inside of that, so you could then re-use the client for 
>>>>>>>>>>>> all of
>>>>>>>>>>>> the queries on each partition. This approach will avoid having to 
>>>>>>>>>>>> serialize
>>>>>>>>>>>> the Elasticsearch connection because it will be local to your 
>>>>>>>>>>>> function.
>>>>>>>>>>>>
>>>>>>>>>>>> Hope this helps!
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>
>>>>>>>>>>>> Holden :)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Jun 24, 2014 at 4:28 PM, Mayur Rustagi <
>>>>>>>>>>>> mayur.rust...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Its not used as default serializer for some issues with
>>>>>>>>>>>>> compatibility & requirement to register the classes..
>>>>>>>>>>>>>
>>>>>>>>>>>>> Which part are you getting as nonserializable... you need to
>>>>>>>>>>>>> serialize that class if you are sending it to spark workers 
>>>>>>>>>>>>> inside a map,
>>>>>>>>>>>>> reduce , mappartition or any of the operations on RDD.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mayur Rustagi
>>>>>>>>>>>>> Ph: +1 (760) 203 3257
>>>>>>>>>>>>> http://www.sigmoidanalytics.com
>>>>>>>>>>>>> @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 25, 2014 at 4:52 AM, Peng Cheng <pc...@uow.edu.au>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm afraid persisting connection across two tasks is a
>>>>>>>>>>>>>> dangerous act as they
>>>>>>>>>>>>>> can't be guaranteed to be executed on the same machine. Your
>>>>>>>>>>>>>> ES server may
>>>>>>>>>>>>>> think its a man-in-the-middle attack!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think its possible to invoke a static method that give you
>>>>>>>>>>>>>> a connection in
>>>>>>>>>>>>>> a local 'pool', so nothing will sneak into your closure, but
>>>>>>>>>>>>>> its too complex
>>>>>>>>>>>>>> and there should be a better option.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Never use kryo before, if its that good perhaps we should use
>>>>>>>>>>>>>> it as the
>>>>>>>>>>>>>> default serializer
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> View this message in context:
>>>>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/ElasticSearch-enrich-tp8209p8222.html
>>>>>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Cell : 425-233-8271
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Cell : 425-233-8271
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Cell : 425-233-8271
>>>>
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
>

Reply via email to