[no subject]

2021-11-18 Thread Sam Elamin
unsubscribe


Re: REST Structured Steaming Sink

2020-07-03 Thread Sam Elamin
Hi Folks,

Great discussion! I will take into account rate-limiting and make it
configurable for the http request as well as all

I was wondering if there is anything I might have missed that would make it
technically impossible to do or at least difficult enough to not warrant
the effort

Is there anything I might have overlooked? Also, would this be useful to
people?

My idea is from a business perspective, why are we making them wait till
the next scheduled batch run for data that is already available from an
API. You could run a job every minute/hour but that in itself sounds like a
streaming use-case

Thoughts?

Regards
Sam

On Thu, Jul 2, 2020 at 3:31 AM Burak Yavuz  wrote:

> Well, the difference is, a technical user writes the UDF and a
> non-technical user may use this built-in thing (misconfigure it) and shoot
> themselves in the foot.
>
> On Wed, Jul 1, 2020, 6:40 PM Andrew Melo  wrote:
>
>> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
>> >
>> > I'm not sure having a built-in sink that allows you to DDOS servers is
>> the best idea either. foreachWriter is typically used for such use cases,
>> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate
>> limiting, etc.
>>
>> If you control the machines and can run arbitrary code, you can DDOS
>> whatever you want. What's the difference between this proposal and
>> writing a UDF that opens 1,000 connections to a target machine?
>>
>> > Best,
>> > Burak
>> >
>> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau 
>> wrote:
>> >>
>> >> I think adding something like this (if it doesn't already exist) could
>> help make structured streaming easier to use, foreachBatch is not the best
>> API.
>> >>
>> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>> >>>
>> >>> I guess the method, query parameter, header, and the payload would be
>> all different for almost every use case - that makes it hard to generalize
>> and requires implementation to be pretty much complicated to be flexible
>> enough.
>> >>>
>> >>> I'm not aware of any custom sink implementing REST so your best bet
>> would be simply implementing your own with foreachBatch, but so someone
>> might jump in and provide a pointer if there is something in the Spark
>> ecosystem.
>> >>>
>> >>> Thanks,
>> >>> Jungtaek Lim (HeartSaVioR)
>> >>>
>> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin 
>> wrote:
>> >>>>
>> >>>> Hi All,
>> >>>>
>> >>>>
>> >>>> We ingest alot of restful APIs into our lake and I'm wondering if it
>> is at all possible to created a rest sink in structured streaming?
>> >>>>
>> >>>> For now I'm only focusing on restful services that have an
>> incremental ID so my sink can just poll for new data then ingest.
>> >>>>
>> >>>> I can't seem to find a connector that does this and my gut instinct
>> tells me it's probably because it isn't possible due to something
>> completely obvious that I am missing
>> >>>>
>> >>>> I know some RESTful API obfuscate the IDs to a hash of strings and
>> that could be a problem but since I'm planning on focusing on just
>> numerical IDs that just get incremented I think I won't be facing that issue
>> >>>>
>> >>>>
>> >>>> Can anyone let me know if this sounds like a daft idea? Will I need
>> something like Kafka or kinesis as a buffer and redundancy or am I
>> overthinking this?
>> >>>>
>> >>>>
>> >>>> I would love to bounce ideas with people who runs structured
>> streaming jobs in production
>> >>>>
>> >>>>
>> >>>> Kind regards
>> >>>> San
>> >>>>
>> >>>>
>> >>
>> >>
>> >> --
>> >> Twitter: https://twitter.com/holdenkarau
>> >> Books (Learning Spark, High Performance Spark, etc.):
>> https://amzn.to/2MaRAG9
>> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>
>


REST Structured Steaming Sink

2020-07-01 Thread Sam Elamin
Hi All,


We ingest alot of restful APIs into our lake and I'm wondering if it is at
all possible to created a rest sink in structured streaming?

For now I'm only focusing on restful services that have an incremental ID
so my sink can just poll for new data then ingest.

I can't seem to find a connector that does this and my gut instinct tells
me it's probably because it isn't possible due to something completely
obvious that I am missing

I know some RESTful API obfuscate the IDs to a hash of strings and that
could be a problem but since I'm planning on focusing on just numerical IDs
that just get incremented I think I won't be facing that issue


Can anyone let me know if this sounds like a daft idea? Will I need
something like Kafka or kinesis as a buffer and redundancy or am I
overthinking this?


I would love to bounce ideas with people who runs structured streaming jobs
in production


Kind regards
San


Re: Spark Scala reading from Google Cloud BigQuery table throws error

2018-12-19 Thread Sam Elamin
Hi Mich

I wrote a connector to make it easier to connect Bigquery and Spark

Have a look here https://github.com/samelamin/spark-bigquery/

Your feedback is always welcome

Kind Regards
Sam

On Tue, Dec 18, 2018 at 7:46 PM Mich Talebzadeh 
wrote:

> Thanks Jorn. I will try that. Requires installing sbt etc on ephemeral
> compute server in Google Cloud to built an uber jar file.
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 18 Dec 2018 at 11:16, Jörn Franke  wrote:
>
>> Maybe the guava version in your spark lib folder is not compatible (if
>> your Spark version has a guava library)? In this case i propose to create a
>> fat/uber jar potentially with a shaded guava dependency.
>>
>> Am 18.12.2018 um 11:26 schrieb Mich Talebzadeh > >:
>>
>> Hi,
>>
>> I am writing a small test code in spark-shell with attached jar
>> dependencies
>>
>> spark-shell --jars
>> /home/hduser/jars/bigquery-connector-0.13.4-hadoop3.jar,/home/hduser/jars/gcs-connector-1.9.4-hadoop3.jar,/home/hduser/jars/other/guava-19.0.jar,/home/hduser/jars/google-api-client-1.4.1-beta.jar,/home/hduser/jars/google-api-client-json-1.2.3-alpha.jar,/home/hduser/jars/google-api-services-bigquery-v2-rev20181202-1.27.0.jar
>>
>>  to read an already existing table in Google BigQuery as follows:
>>
>> import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration
>> import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat
>> import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat
>> import
>> com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration
>> import
>> com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat
>> import com.google.gson.JsonObject
>> import org.apache.hadoop.io.LongWritable
>> import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
>> // Assumes you have a spark context (sc) -- running from spark-shell REPL.
>> // Marked as transient since configuration is not Serializable. This
>> should
>> // only be necessary in spark-shell REPL.
>> @transient
>> val conf = sc.hadoopConfiguration
>> // Input parameters.
>> val fullyQualifiedInputTableId = "axial-glow-224522.accounts.ll_18740868"
>> val projectId = conf.get("fs.gs.project.id")
>> val bucket = conf.get("fs.gs.system.bucket")
>> // Input configuration.
>> conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId)
>> conf.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
>> BigQueryConfiguration.configureBigQueryInput(conf,
>> fullyQualifiedInputTableId)
>>
>> The problem I have is that even after loading jars with spark-shell --jar
>>
>> I am getting the following error at the last line
>>
>> scala> BigQueryConfiguration.configureBigQueryInput(conf,
>> fullyQualifiedInputTableId)
>>
>> java.lang.NoSuchMethodError:
>> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V
>>   at
>> com.google.cloud.hadoop.io.bigquery.BigQueryStrings.parseTableReference(BigQueryStrings.java:68)
>>   at
>> com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(BigQueryConfiguration.java:260)
>>   ... 49 elided
>>
>> It says it cannot find method
>>
>> java.lang.NoSuchMethodError:
>> com.google.common.base.Preconditions.checkArgument
>>
>> but I checked it and it is in the following jar file
>>
>> jar tvf guava-19.0.jar| grep common.base.Preconditions
>>   5249 Wed Dec 09 15:58:14 UTC 2015
>> com/google/common/base/Preconditions.class
>>
>> I have used different version of guava jar files but none works!
>>
>> The code is based on the following:
>>
>>
>> https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>


Re: from_json()

2017-08-28 Thread Sam Elamin
Hi jg,

Perhaps I am misunderstanding you, but if you just want to create a new
schema from a df its fairly simple, assuming you have a schema already
predefined or in a string. i.e.

val newSchema = DataType.fromJson(json_schema_string)

then all you need to do is re-create the dataframe using this new dataframe

sqlContext.createDataFrame(oldDF.rdd,newSchema)

Regards
Sam

On Mon, Aug 28, 2017 at 5:57 PM, JG Perrin  wrote:

> Is there a way to not have to specify a schema when using from_json() or
> infer the schema? When you read a JSON doc from disk, you can infer the
> schema. Should I write it to disk before (ouch)?
>
>
>
> jg
> --
>
> This electronic transmission and any documents accompanying this
> electronic transmission contain confidential information belonging to the
> sender. This information may contain confidential health information that
> is legally privileged. The information is intended only for the use of the
> individual or entity named above. The authorized recipient of this
> transmission is prohibited from disclosing this information to any other
> party unless required to do so by law or regulation and is required to
> delete or destroy the information after its stated need has been fulfilled.
> If you are not the intended recipient, you are hereby notified that any
> disclosure, copying, distribution or the taking of any action in reliance
> on or regarding the contents of this electronically transmitted information
> is strictly prohibited. If you have received this E-mail in error, please
> notify the sender and delete this message immediately.
>


Re: [ANNOUNCE] Announcing Apache Spark 2.2.0

2017-07-17 Thread Sam Elamin
Well done!  This is amazing news :) Congrats and really cant wait to spread
the structured streaming love!

On Mon, Jul 17, 2017 at 5:25 PM, kant kodali  wrote:

> +1
>
> On Tue, Jul 11, 2017 at 3:56 PM, Jean Georges Perrin  wrote:
>
>> Awesome! Congrats! Can't wait!!
>>
>> jg
>>
>>
>> On Jul 11, 2017, at 18:48, Michael Armbrust 
>> wrote:
>>
>> Hi all,
>>
>> Apache Spark 2.2.0 is the third release of the Spark 2.x line. This
>> release removes the experimental tag from Structured Streaming. In
>> addition, this release focuses on usability, stability, and polish,
>> resolving over 1100 tickets.
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 2.2.0, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes: https://spark.apache.or
>> g/releases/spark-release-2-2-0.html
>>
>> *(note: If you see any issues with the release notes, webpage or
>> published artifacts, please contact me directly off-list) *
>>
>> Michael
>>
>>
>


Re: Restful API Spark Application

2017-05-12 Thread Sam Elamin
Hi Nipun

Have you checked out the job servwr

https://github.com/spark-jobserver/spark-jobserver

Regards
Sam
On Fri, 12 May 2017 at 21:00, Nipun Arora  wrote:

> Hi,
>
> We have written a java spark application (primarily uses spark sql). We
> want to expand this to provide our application "as a service". For this, we
> are trying to write a REST API. While a simple REST API can be easily made,
> and I can get Spark to run through the launcher. I wonder, how the spark
> context can be used by service requests, to process data.
>
> Are there any simple JAVA examples to illustrate this use-case? I am sure
> people have faced this before.
>
>
> Thanks
> Nipun
>


Re: Spark Testing Library Discussion

2017-04-29 Thread Sam Elamin
Hi lucas


Thanks for the detailed feedback, that's really useful!

I did suggest Github but my colleague asked for an email

You raise a good point with the grammar, sure I will rephrase it. I am more
than happy to merge in the PR if you send it


Th at said I know you can make BDD tests using any framework but I am a
lazy developer and would rather use the framework or library defaults to
make it easier for other devs to pick up.

The number of rows is only a start correct, we can add more tests to check
the transformed version but I was going to point that out on the future
part of the series since this one is mainly about raw extracts.


Thank you very much for the feedback and I will be sure to add it once I
have more feedback


Maybe we can create a gist of all this or even a tiny book on best
practices if people find it useful

Looking forward to the PR!

Regards
Sam





On Sat, 29 Apr 2017 at 06:36, lucas.g...@gmail.com <lucas.g...@gmail.com>
wrote:

> Awesome, thanks.
>
> Just reading your post
>
> A few observations:
> 1) You're giving out Marius's email: "I have been lucky enough to
> build this pipeline with the amazing Marius Feteanu".  A linked or
> github link might be more helpful.
>
> 2) "If you are in Pyspark world sadly Holden’s test base wont work so
> I suggest you check out Pytest and pytest-bdd.".  doesn't read well to
> me, on first read I was wondering if Spark-Test-Base wasn't available
> in python... It took me about 20 seconds to figure out that you
> probably meant it doesn't allow for direct BDD semantics.  My 2nd
> observation here is that BDD semantics can be aped in any given
> testing framework.  You just need to be flexible :)
>
> 3) You're doing a transformation (IE JSON input against a JSON
> schema).  You are testing for # of rows which is a good start.  But I
> don't think that really exercises a test against your JSON schema. I
> tend to view schema as the things that need the most rigorous testing
> (it's code after all).  IE I would want to confirm that the output
> matches the expected shape and values after being loaded against the
> schema.
>
> I saw a few minor spelling and grammatical issues as well.  I put a PR
> into your blog for them.  I won't be offended if you squish it :)
>
> I should be getting into our testing 'how-to' stuff this week.  I'll
> scrape our org specific stuff and put it up to github this week as
> well.  It'll be in python so maybe we'll get both use cases covered
> with examples :)
>
> G
>
> On 27 April 2017 at 03:46, Sam Elamin <hussam.ela...@gmail.com> wrote:
> > Hi
> >
> > @Lucas I certainly would love to write an integration testing library for
> > workflows, I have a few ideas I would love to share with others and they
> are
> > focused around Airflow since that is what we use
> >
> >
> > As promised here is the first blog post in a series of posts I hope to
> write
> > on how we build data pipelines
> >
> > Please feel free to retweet my original tweet and share because the more
> > ideas we have the better!
> >
> > Feedback is always welcome!
> >
> > Regards
> > Sam
> >
> > On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com
> > <lucas.g...@gmail.com> wrote:
> >>
> >> Hi all, whoever (Sam I think) was going to do some work on doing a
> >> template testing pipeline.  I'd love to be involved, I have a current
> task
> >> in my day job (data engineer) to flesh out our testing how-to / best
> >> practices for Spark jobs and I think I'll be doing something very
> similar
> >> for the next week or 2.
> >>
> >> I'll scrape out what i have now in the next day or so and put it up in a
> >> gist that I can share too.
> >>
> >> G
> >>
> >> On 25 April 2017 at 13:04, Holden Karau <hol...@pigscanfly.ca> wrote:
> >>>
> >>> Urgh hangouts did something frustrating, updated link
> >>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
> >>>
> >>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau <hol...@pigscanfly.ca>
> >>> wrote:
> >>>>
> >>>> The (tentative) link for those interested is
> >>>> https://hangouts.google.com/hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
> >>>>
> >>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau <hol...@pigscanfly.ca>
> >>>> wrote:
> >>>>>
> >>>>> So 14 people have said they are available on Tuesday the 25th at 1PM
> >>>>> pacific so we will do this meeting then (
> >>>>

Re: Spark Testing Library Discussion

2017-04-27 Thread Sam Elamin
Hi

@Lucas I certainly would love to write an integration testing library for
workflows, I have a few ideas I would love to share with others and they
are focused around Airflow since that is what we use


As promised here
 is
the first blog post in a series of posts I hope to write on how we build
data pipelines

Please feel free to retweet my original tweet
 and share because
the more ideas we have the better!

Feedback is always welcome!

Regards
Sam

On Tue, Apr 25, 2017 at 10:32 PM, lucas.g...@gmail.com  wrote:

> Hi all, whoever (Sam I think) was going to do some work on doing a
> template testing pipeline.  I'd love to be involved, I have a current task
> in my day job (data engineer) to flesh out our testing how-to / best
> practices for Spark jobs and I think I'll be doing something very similar
> for the next week or 2.
>
> I'll scrape out what i have now in the next day or so and put it up in a
> gist that I can share too.
>
> G
>
> On 25 April 2017 at 13:04, Holden Karau  wrote:
>
>> Urgh hangouts did something frustrating, updated link
>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>>
>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau 
>> wrote:
>>
>>> The (tentative) link for those interested is https://hangouts.google.com
>>> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
>>>
>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau 
>>> wrote:
>>>
 So 14 people have said they are available on Tuesday the 25th at 1PM
 pacific so we will do this meeting then ( https://doodle.com/poll/69y6
 yab4pyf7u8bn ).

 Since hangouts tends to work ok on the Linux distro I'm running my
 default is to host this as a "hangouts-on-air" unless there are alternative
 ideas.

 I'll record the hangout and if it isn't terrible I'll post it for those
 who weren't able to make it (and for next time I'll include more European
 friendly time options - Doodle wouldn't let me update it once posted).

 On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau 
 wrote:

> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>
> Awhile back on one of the many threads about testing in Spark there
> was some interest in having a chat about the state of Spark testing and
> what people want/need.
>
> So if you are interested in joining an online (with maybe an IRL
> component if enough people are SF based) chat about Spark testing please
> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn
>
> I think reasonable topics of discussion could be:
>
> 1) What is the state of the different Spark testing libraries in the
> different core (Scala, Python, R, Java) and extended languages (C#,
> Javascript, etc.)?
> 2) How do we make these more easily discovered by users?
> 3) What are people looking for in their testing libraries that we are
> missing? (can be functionality, documentation, etc.)
> 4) Are there any examples of well tested open source Spark projects
> and where are they?
>
> If you have other topics that's awesome.
>
> To clarify this about libraries and best practices for people testing
> their Spark applications, and less about testing Spark's internals
> (although as illustrated by some of the libraries there is some strong
> overlap in what is required to make that work).
>
> Cheers,
>
> Holden :)
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>



 --
 Cell : 425-233-8271 <(425)%20233-8271>
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


Re: help/suggestions to setup spark cluster

2017-04-26 Thread Sam Elamin
Hi Anna

There are a variety of options for launching spark clusters. I doubt people
run spark in a. Single EC2 instance, certainly not in production I don't
think

I don't have enough information of what you are trying to do but if you are
just trying to set things up from scratch then I think you can just use EMR
which will create a cluster for you and attach a zeppelin instance as well


You can also use databricks for ease of use and very little management but
you will pay a premium for that abstraction


Regards
Sam
On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:

> I need to setup a spark cluster for Spark streaming and scheduled batch
> jobs and adhoc queries.
> Please give me some suggestions. Can this be done in standalone mode.
>
> Right now we have a spark cluster in standalone mode on AWS EC2 running
> spark streaming application. Can we run spark batch jobs and zeppelin on
> the same. Do we need a better resource manager like Mesos?
>
> Are there any companies or individuals that can help in setting this up?
>
> Thank you.
>
> -Anna
>


Re: How to convert Dstream of JsonObject to Dataframe in spark 2.1.0?

2017-04-24 Thread Sam Elamin
you have 2 options
1 )Clean ->Write your own parser to through each property and create a
dataset
2) Hacky but simple -> Convert to json string then read in using
spark.read.json(jsonString)

Please bear in mind the second option is expensive which is why it is hacky

I wrote my own parser here

which you can use to convert between JsonObjects to StructType schemas

Regards
Sam


On Sun, Apr 23, 2017 at 7:50 PM, kant kodali  wrote:

> Hi All,
>
> How to convert Dstream of JsonObject to Dataframe in spark 2.1.0? That
> JsonObject is from Gson Library.
>
> Thanks!
>


Deploying Spark Applications. Best Practices And Patterns

2017-04-12 Thread Sam Elamin
h tools to
>> use', but one step further, "what are the new tools and techniques to
>> use?".
>>
>> I look forward to whatever insight people have here.
>>
>>
>> My genuine advice to everyone in all spheres of activities will be to
>> first understand the problem to solve before solving it and definitely
>> before selecting the tools to solve it, otherwise you will land up with a
>> bowl of soup and fork in hand and argue that CI/ CD is still applicable to
>> building data products and data warehousing.
>>
>>
>> I concur
>>
>> Regards,
>> Gourav
>>
>>
>> -Steve
>>
>> On Wed, Apr 12, 2017 at 12:42 PM, Steve Loughran <ste...@hortonworks.com>
>> wrote:
>>
>>>
>>> On 11 Apr 2017, at 20:46, Gourav Sengupta <gourav.sengu...@gmail.com>
>>> wrote:
>>>
>>> And once again JAVA programmers are trying to solve a data analytics and
>>> data warehousing problem using programming paradigms. It genuinely a pain
>>> to see this happen.
>>>
>>>
>>>
>>> While I'm happy to be faulted for treating things as software processes,
>>> having a full automated mechanism for testing the latest code before
>>> production is something I'd consider foundational today. This is what
>>> "Contiunous Deployment" was about when it was first conceived. Does it mean
>>> you should blindly deploy that way? well, not if you worry about security,
>>> but having that review process and then a final manual "deploy" button can
>>> address that.
>>>
>>> Cloud infras let you integrate cluster instantiation to the process;
>>> which helps you automate things like "stage the deployment in some new VMs,
>>> run acceptance tests (*), then switch the load balancer over to the new
>>> cluster, being ready to switch back if you need. I've not tried that with
>>> streaming apps though; I don't know how to do it there. Boot the new
>>> cluster off checkpointed state requires deserialization to work, which
>>> can't be guaranteed if you are changing the objects which get serialized.
>>>
>>> I'd argue then, it's not a problem which has already been solved by data
>>> analystics/warehousing —though if you've got pointers there, I'd be
>>> grateful. Always good to see work by others. Indeed, the telecoms industry
>>> have led the way in testing and HA deployment: if you look at Erlang you
>>> can see a system designed with hot upgrades in mind, the way java code "add
>>> a JAR to a web server" never was.
>>>
>>> -Steve
>>>
>>>
>>> (*) do always make sure this is the test cluster with a snapshot of test
>>> data, not production machines/data. There are always horror stories there.
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Tue, Apr 11, 2017 at 2:20 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>>> Hi Steve
>>>>
>>>>
>>>> Thanks for the detailed response, I think this problem doesn't have an
>>>> industry standard solution as of yet and I am sure a lot of people would
>>>> benefit from the discussion
>>>>
>>>> I realise now what you are saying so thanks for clarifying, that said
>>>> let me try and explain how we approached the problem
>>>>
>>>> There are 2 problems you highlighted, the first if moving the code from
>>>> SCM to prod, and the other is enusiring the data your code uses is correct.
>>>> (using the latest data from prod)
>>>>
>>>>
>>>> *"how do you get your code from SCM into production?"*
>>>>
>>>> We currently have our pipeline being run via airflow, we have our dags
>>>> in S3, with regards to how we get our code from SCM to production
>>>>
>>>> 1) Jenkins build that builds our spark applications and runs tests
>>>> 2) Once the first build is successful we trigger another build to copy
>>>> the dags to an s3 folder
>>>>
>>>> We then routinely sync this folder to the local airflow dags folder
>>>> every X amount of mins
>>>>
>>>> Re test data
>>>> *" but what's your strategy for test data: that's always the
>>>> troublespot."*
>>>>
>>>> Our application is using versioning against the data, so we expect the
>>>> source data to be in a certain version and the output data to also be in a
>>>> certain version
>>>>
>>>> We have a test resources folder that we have following the same
>>>> convention of versioning - this is the data that our application tests use
>>>> - to ensure that the data is in the correct format
>>>>
>>>> so for example if we have Table X with version 1 that depends on data
>>>> from Table A and B also version 1, we run our spark application then ensure
>>>> the transformed table X has the correct columns and row values
>>>>
>>>> Then when we have a new version 2 of the source data or adding a new
>>>> column in Table X (version 2), we generate a new version of the data and
>>>> ensure the tests are updated
>>>>
>>>> That way we ensure any new version of the data has tests against it
>>>>
>>>> *"I've never seen any good strategy there short of "throw it at a copy
>>>> of the production dataset"."*
>>>>
>>>> I agree which is why we have a sample of the production data and
>>>> version the schemas we expect the source and target data to look like.
>>>>
>>>> If people are interested I am happy writing a blog about it in the
>>>> hopes this helps people build more reliable pipelines
>>>>
>>>>
>>> Love to see that.
>>>
>>> Kind Regards
>>>> Sam
>>>>
>>>
>>>
>>
>>
>


Re: Spark Streaming. Real-time save data and visualize on dashboard

2017-04-12 Thread Sam Elamin
Hi

To be honest there are a variety of options but it all comes down to who
will be querying these dashboards.

If the end user is an engineer then the ELK stack is fine and I can attest
to the ease of use of kibana since I used it quite heavily.

On the other hand in my experience it isnt the engineers that are in charge
of reporting so if the end user is a data analyst or data scientist then
they are most comfortable using SQL and would be slightly aversed to
learning the nuances of creating dashboards and using elastic search. Trust
me no matter how much you try, these folks are more comfortable using sql
and Tableau like platforms. So you will have to educate them, Not to
mention the fact that any new hire will have to undergo the same training
to be productive

My suggestion for that is to push your data to Google BigQuery
. It really is simple to use and people
can just focus on writing their queries. It also returns within seconds for
queries over terabytes of data. The caveat here is that you are paying per
query. But it's $5 for 1 TB which is peanuts really. Its a managed service
so there is zero setup costs and management compared to the other services.
I suppose in the end you are paying to abstract that knowledge away

Happy to answer any questions you might have

Kind Regards
Sam




On Wed, 12 Apr 2017 at 09:36, tencas  wrote:

> Hi Gaurav1809 ,
>
> I was thinking about using elasticsearch + kibana too (actually don't know
> the differences between ELK and elasticsearch).
> I was wondering about pros and cons of using a document indexer vs NoSQL
> database.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-Streaming-Real-time-save-data-
> and-visualize-on-dashboard-tp28587p28589.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: optimising storage and ec2 instances

2017-04-11 Thread Sam Elamin
Hi Zeming Yu, Steve

Just to add, we are also going down partitioning using this route but you
should know if you are in AWS land, you are most likely going to use EMRs
at any given time

At the moment EMRs does not do recursive search on wildcards, see this


However Spark seems to be able to deal with it fine, so if you dont have a
data serving layer to your customers then you should be fine

Regards
sam

On Tue, Apr 11, 2017 at 1:21 PM, Zeming Yu  wrote:

> everything works best if your sources are a few tens to hundreds of MB or
> more
>
> Are you referring to the size of the zip file or individual unzipped files?
>
> Any issues with storing a 60 mb zipped file containing heaps of text files
> inside?
>
> On 11 Apr. 2017 9:09 pm, "Steve Loughran"  wrote:
>
>>
>> > On 11 Apr 2017, at 11:07, Zeming Yu  wrote:
>> >
>> > Hi all,
>> >
>> > I'm a beginner with spark, and I'm wondering if someone could provide
>> guidance on the following 2 questions I have.
>> >
>> > Background: I have a data set growing by 6 TB p.a. I plan to use spark
>> to read in all the data, manipulate it and build a predictive model on it
>> (say GBM) I plan to store the data in S3, and use EMR to launch spark,
>> reading in data from S3.
>> >
>> > 1. Which option is best for storing the data on S3 for the purpose of
>> analysing it in EMR spark?
>> > Option A: storing the 6TB file as 173 million individual text files
>> > Option B: zipping up the above 173 million text files as 240,000 zip
>> files
>> > Option C: appending the individual text files, so have 240,000 text
>> files p.a.
>> > Option D: combining the text files even further
>> >
>>
>> everything works best if your sources are a few tens to hundreds of MB or
>> more of your data, work can be partitioned up by file. If you use more
>> structured formats (avro compressed with snappy, etc), you can throw > 1
>> executor at work inside a file. Structure is handy all round, even if its
>> just adding timestamp and provenance columns to each data file.
>>
>> there's the HAR file format from Hadoop which can merge lots of small
>> files into larger ones, allowing work to be scheduled per har file.
>> Recommended for HDFS as it hates small files, on S3 you still have limits
>> on small files (including throttling of HTTP requests to shards of a
>> bucket), but they are less significant.
>>
>> One thing to be aware is that the s3 clients spark use are very
>> inefficient in listing wide directory trees, and Spark not always the best
>> at partitioning work because of this. You can accidentally create a very
>> inefficient tree structure like datasets/year=2017/month=5/day=10/hour=12/,
>> with only one file per hour. Listing and partitioning suffers here, and
>> while s3a on Hadoop 2.8 is better here, Spark hasn't yet fully adapted to
>> those changes (use of specific API calls). There's also a lot more to be
>> done in S3A to handle wildcards in the directory tree much more efficiently
>> (HADOOP-13204); needs to address pattens like 
>> (datasets/year=201?/month=*/day=10)
>> without treewalking and without fetching too much data from wildcards near
>> the top of the tree. We need to avoid implementing something which works
>> well on *my* layouts, but absolutely dies on other people's. As is usual in
>> OSS, help welcome; early testing here as critical as coding, so as to
>> ensure things will work with your file structures
>>
>> -Steve
>>
>>
>> > 2. Any recommendations on the EMR set up to analyse the 6TB of data all
>> at once and build a GBM, in terms of
>> > 1) The type of EC2 instances I need?
>> > 2) The number of such instances I need?
>> > 3) Rough estimate of cost?
>> >
>>
>> no opinion there
>>
>> >
>> > Thanks so much,
>> > Zeming
>> >
>>
>>


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-11 Thread Sam Elamin
Hi Steve


Thanks for the detailed response, I think this problem doesn't have an
industry standard solution as of yet and I am sure a lot of people would
benefit from the discussion

I realise now what you are saying so thanks for clarifying, that said let
me try and explain how we approached the problem

There are 2 problems you highlighted, the first if moving the code from SCM
to prod, and the other is enusiring the data your code uses is correct.
(using the latest data from prod)


*"how do you get your code from SCM into production?"*

We currently have our pipeline being run via airflow, we have our dags in
S3, with regards to how we get our code from SCM to production

1) Jenkins build that builds our spark applications and runs tests
2) Once the first build is successful we trigger another build to copy the
dags to an s3 folder

We then routinely sync this folder to the local airflow dags folder every X
amount of mins

Re test data
*" but what's your strategy for test data: that's always the troublespot."*

Our application is using versioning against the data, so we expect the
source data to be in a certain version and the output data to also be in a
certain version

We have a test resources folder that we have following the same convention
of versioning - this is the data that our application tests use - to ensure
that the data is in the correct format

so for example if we have Table X with version 1 that depends on data from
Table A and B also version 1, we run our spark application then ensure the
transformed table X has the correct columns and row values

Then when we have a new version 2 of the source data or adding a new column
in Table X (version 2), we generate a new version of the data and ensure
the tests are updated

That way we ensure any new version of the data has tests against it

*"I've never seen any good strategy there short of "throw it at a copy of
the production dataset"."*

I agree which is why we have a sample of the production data and version
the schemas we expect the source and target data to look like.

If people are interested I am happy writing a blog about it in the hopes
this helps people build more reliable pipelines

Kind Regards
Sam










On Tue, Apr 11, 2017 at 11:31 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 7 Apr 2017, at 18:40, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> Definitely agree with gourav there. I wouldn't want jenkins to run my work
> flow. Seems to me that you would only be using jenkins for its scheduling
> capabilities
>
>
> Maybe I was just looking at this differenlty
>
> Yes you can run tests but you wouldn't want it to run your orchestration
> of jobs
>
> What happens if jenkijs goes down for any particular reason. How do you
> have the conversation with your stakeholders that your pipeline is not
> working and they don't have data because the build server is going through
> an upgrade or going through an upgrade
>
>
>
> Well, I wouldn't use it as a replacement for Oozie, but I'd certainly
> consider as the pipeline for getting your code out to the cluster, so you
> don't have to explain why you just pushed out something broken
>
> As example, here's Renault's pipeline as discussed last week in Munich
> https://flic.kr/p/Tw3Emu
>
> However to be fair I understand what you are saying Steve if someone is in
> a place where you only have access to jenkins and have to go through hoops
> to setup:get access to new instances then engineers will do what they
> always do, find ways to game the system to get their work done
>
>
>
>
> This isn't about trying to "Game the system", this is about what makes a
> replicable workflow for getting code into production, either at the press
> of a button or as part of a scheduled "we push out an update every night,
> rerun the deployment tests and then switch over to the new installation"
> mech.
>
> Put differently: how do you get your code from SCM into production? Not
> just for CI, but what's your strategy for test data: that's always the
> troublespot. Random selection of rows may work, although it will skip the
> odd outlier (high-unicode char in what should be a LATIN-1 field, time set
> to 0, etc), and for work joining > 1 table, you need rows which join well.
> I've never seen any good strategy there short of "throw it at a copy of the
> production dataset".
>
>
> -Steve
>
>
>
>
>
>
> On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi Steve,
>>
>> Why would you ever do that? You are suggesting the use of a CI tool as a
>> workflow and orchestration engine.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Apr 7, 2017 at 

Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread Sam Elamin
Definitely agree with gourav there. I wouldn't want jenkins to run my work
flow. Seems to me that you would only be using jenkins for its scheduling
capabilities

Yes you can run tests but you wouldn't want it to run your orchestration of
jobs

What happens if jenkijs goes down for any particular reason. How do you
have the conversation with your stakeholders that your pipeline is not
working and they don't have data because the build server is going through
an upgrade or going through an upgrade

However to be fair I understand what you are saying Steve if someone is in
a place where you only have access to jenkins and have to go through hoops
to setup:get access to new instances then engineers will do what they
always do, find ways to game the system to get their work done




On Fri, 7 Apr 2017 at 16:17, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Steve,
>
> Why would you ever do that? You are suggesting the use of a CI tool as a
> workflow and orchestration engine.
>
> Regards,
> Gourav Sengupta
>
> On Fri, Apr 7, 2017 at 4:07 PM, Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>> If you have Jenkins set up for some CI workflow, that can do scheduled
>> builds and tests. Works well if you can do some build test before even
>> submitting it to a remote cluster
>>
>> On 7 Apr 2017, at 10:15, Sam Elamin <hussam.ela...@gmail.com> wrote:
>>
>> Hi Shyla
>>
>> You have multiple options really some of which have been already listed
>> but let me try and clarify
>>
>> Assuming you have a spark application in a jar you have a variety of
>> options
>>
>> You have to have an existing spark cluster that is either running on EMR
>> or somewhere else.
>>
>> *Super simple / hacky*
>> Cron job on EC2 that calls a simple shell script that does a spart submit
>> to a Spark Cluster OR create or add step to an EMR cluster
>>
>> *More Elegant*
>> Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will
>> do the above step but have scheduling and potential backfilling and error
>> handling(retries,alerts etc)
>>
>> AWS are coming out with glue <https://aws.amazon.com/glue/> soon that
>> does some Spark jobs but I do not think its available worldwide just yet
>>
>> Hope I cleared things up
>>
>> Regards
>> Sam
>>
>>
>> On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Shyla,
>>>
>>> why would you want to schedule a spark job in EC2 instead of EMR?
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
>>>> easiest way to do this. Thanks
>>>>
>>>
>>>
>>
>>
>


Re: What is the best way to run a scheduled spark batch job on AWS EC2 ?

2017-04-07 Thread Sam Elamin
Hi Shyla

You have multiple options really some of which have been already listed but
let me try and clarify

Assuming you have a spark application in a jar you have a variety of options

You have to have an existing spark cluster that is either running on EMR or
somewhere else.

*Super simple / hacky*
Cron job on EC2 that calls a simple shell script that does a spart submit
to a Spark Cluster OR create or add step to an EMR cluster

*More Elegant*
Airflow/Luigi/AWS Data Pipeline (Which is just CRON in the UI ) that will
do the above step but have scheduling and potential backfilling and error
handling(retries,alerts etc)

AWS are coming out with glue  soon that does
some Spark jobs but I do not think its available worldwide just yet

Hope I cleared things up

Regards
Sam


On Fri, Apr 7, 2017 at 6:05 AM, Gourav Sengupta 
wrote:

> Hi Shyla,
>
> why would you want to schedule a spark job in EC2 instead of EMR?
>
> Regards,
> Gourav
>
> On Fri, Apr 7, 2017 at 1:04 AM, shyla deshpande 
> wrote:
>
>> I want to run a spark batch job maybe hourly on AWS EC2 .  What is the
>> easiest way to do this. Thanks
>>
>
>


Re: Executor unable to pick postgres driver in Spark standalone cluster

2017-04-04 Thread Sam Elamin
Hi Rishikesh,

Sounds like the postgres driver isnt being loaded on the path. To try and
debug it try submit the application with the --jars

e.g.

spark-submit {application.jar} --jars /home/ubuntu/downloads/
postgres/postgresql-9.4-1200-jdbc41.jar


If that does not work then there is a problem in the application itself and
the reason it is working is because you have the dependency in your class
path locally


Regards
Sam

On Mon, Apr 3, 2017 at 2:43 PM, Rishikesh Teke 
wrote:

>
> Hi all,
>
> I was submitting the play application to spark 2.1 standalone cluster . In
> play application postgres dependency is also added and application works on
> local spark libraries. But at run time on standalone cluster it gives me
> error :
>
> o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 1, 172.31.21.3,
> executor 1): java.lang.ClassNotFoundException: org.postgresql
> .Driver
>
> I have placed following in spark-defaults.conf directory
>
> spark.executor.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
> spark.driver.extraClassPath
> /home/ubuntu/downloads/postgres/postgresql-9.4-1200-jdbc41.jar
>
> Still executors unable to pick the driver.
> Am i missing something? Need help .
> Thanks.
>
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Executor-unable-to-pick-postgres-driver-in-Spark-
> standalone-cluster-tp28563.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Contributing to Spark

2017-03-19 Thread Sam Elamin
Hi All,

I would like to start contributing to Spark if possible, its an amazing
technology and I would love to get involved


The contributing page  states
this "consult the list of starter tasks in JIRA, or ask the
user@spark.apache.org mailing list."


Can anyone guide me on where is best to start? What are these "starter
tasks"?

I was thinking adding tests would be a good place to begin when dealing
with any new code base, perhaps to Pyspark since Scala seems a bit more
stable


Also - if at all possible - I would really appreciate if any of the
contributors or PMC members would be willing to mentor or guide me in this.
Any help would be greatly appreciated!


Regards
Sam


Re: Spark and continuous integration

2017-03-14 Thread Sam Elamin
Thank you both

Steve that's a very interesting point. I have to admit I have never thought
of doing analysis over time on the tests but it makes sense as the failures
over time tell you quite a bit about your data platform

Thanks for highlighting! We are using Pyspark for now so I hope some
frameworks help with that.

Previously we have built data sanity checks that look at counts and numbers
to produce graphs using statsd and Grafana (elk stack) but not necessarily
looking at test metrics


I'll definitely check it out

Kind regards
Sam
On Tue, 14 Mar 2017 at 11:57, Jörn Franke <jornfra...@gmail.com> wrote:

> I agree the reporting is an important aspect. Sonarqube (or similar tool)
> can report over time, but does not support Scala (well indirectly via
> JaCoCo). In the end, you will need to think about a dashboard that displays
> results over time.
>
> On 14 Mar 2017, at 12:44, Steve Loughran <ste...@hortonworks.com> wrote:
>
>
> On 13 Mar 2017, at 13:24, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> Hi Jorn
>
> Thanks for the prompt reply, really we have 2 main concerns with CD,
> ensuring tests pasts and linting on the code.
>
>
> I'd add "providing diagnostics when tests fail", which is a combination
> of: tests providing useful information and CI tooling collecting all those
> results and presenting them meaningfully. The hard parts are invariably (at
> least for me)
>
> -what to do about the intermittent failures
> -tradeoff between thorough testing and fast testing, especially when
> thorough means "better/larger datasets"
>
> You can consider the output of jenkins & tests as data sources for your
> own analysis too: track failure rates over time, test runs over time, etc:
> could be interesting. If you want to go there, then the question of "which
> CI toolings produce the most interesting machine-parseable results, above
> and beyond the classic Ant-originated XML test run reports"
>
> I have mixed feelings about scalatest there: I think the expression
> language is good, but the maven test runner doesn't report that well, at
> least for me:
>
>
> https://steveloughran.blogspot.co.uk/2016/09/scalatest-thoughts-and-ideas.html
>
>
>
> I think all platforms should handle this with ease, I was just wondering
> what people are using.
>
> Jenkins seems to have the best spark plugins so we are investigating that
> as well as a variety of other hosted CI tools
>
> Happy to write a blog post detailing our findings and sharing it here if
> people are interested
>
>
> Regards
> Sam
>
> On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote:
>
> Hi,
>
> Jenkins also now supports pipeline as code and multibranch pipelines. thus
> you are not so dependent on the UI and you do not need anymore a long list
> of jobs for different branches. Additionally it has a new UI (beta) called
> blueocean, which is a little bit nicer. You may also check GoCD. Aside from
> this you have a huge variety of commercial tools, e.g. Bamboo.
> In the cloud, I use for my open source github projects Travis-Ci, but
> there are also a lot of alternatives, e.g. Distelli.
>
> It really depends what you expect, e.g. If you want to Version the build
> pipeline in GIT, if you need Docker deployment etc. I am not sure if new
> starters should be responsible for the build pipeline, thus I am not sure
> that i understand  your concern in this area.
>
> From my experience, integration tests for Spark can be run on any of these
> platforms.
>
> Best regards
>
> > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote:
> >
> > Hi Folks
> >
> > This is more of a general question. What's everyone using for their CI
> /CD when it comes to spark
> >
> > We are using Pyspark but potentially looking to make to spark scala and
> Sbt in the future
> >
> >
> > One of the suggestions was jenkins but I know the UI isn't great for new
> starters so I'd rather avoid it. I've used team city but that was more
> focused on dot net development
> >
> >
> > What are people using?
> >
> > Kind Regards
> > Sam
>
>
>
>


Re: Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Jorn

Thanks for the prompt reply, really we have 2 main concerns with CD,
ensuring tests pasts and linting on the code.

I think all platforms should handle this with ease, I was just wondering
what people are using.

Jenkins seems to have the best spark plugins so we are investigating that
as well as a variety of other hosted CI tools

Happy to write a blog post detailing our findings and sharing it here if
people are interested


Regards
Sam

On Mon, Mar 13, 2017 at 1:18 PM, Jörn Franke <jornfra...@gmail.com> wrote:

> Hi,
>
> Jenkins also now supports pipeline as code and multibranch pipelines. thus
> you are not so dependent on the UI and you do not need anymore a long list
> of jobs for different branches. Additionally it has a new UI (beta) called
> blueocean, which is a little bit nicer. You may also check GoCD. Aside from
> this you have a huge variety of commercial tools, e.g. Bamboo.
> In the cloud, I use for my open source github projects Travis-Ci, but
> there are also a lot of alternatives, e.g. Distelli.
>
> It really depends what you expect, e.g. If you want to Version the build
> pipeline in GIT, if you need Docker deployment etc. I am not sure if new
> starters should be responsible for the build pipeline, thus I am not sure
> that i understand  your concern in this area.
>
> From my experience, integration tests for Spark can be run on any of these
> platforms.
>
> Best regards
>
> > On 13 Mar 2017, at 10:55, Sam Elamin <hussam.ela...@gmail.com> wrote:
> >
> > Hi Folks
> >
> > This is more of a general question. What's everyone using for their CI
> /CD when it comes to spark
> >
> > We are using Pyspark but potentially looking to make to spark scala and
> Sbt in the future
> >
> >
> > One of the suggestions was jenkins but I know the UI isn't great for new
> starters so I'd rather avoid it. I've used team city but that was more
> focused on dot net development
> >
> >
> > What are people using?
> >
> > Kind Regards
> > Sam
>


Spark and continuous integration

2017-03-13 Thread Sam Elamin
Hi Folks

This is more of a general question. What's everyone using for their CI /CD
when it comes to spark

We are using Pyspark but potentially looking to make to spark scala and Sbt
in the future


One of the suggestions was jenkins but I know the UI isn't great for new
starters so I'd rather avoid it. I've used team city but that was more
focused on dot net development


What are people using?

Kind Regards
Sam


Re: How to unit test spark streaming?

2017-03-07 Thread Sam Elamin
Hey kant

You can use holdens spark test base

Have a look at some of the specs I wrote here to give you an idea

https://github.com/samelamin/spark-bigquery/blob/master/src/test/scala/com/samelamin/spark/bigquery/BigQuerySchemaSpecs.scala

Basically you abstract your transformations to take in a dataframe and
return one, then you assert on the returned df

Regards
Sam
On Tue, 7 Mar 2017 at 12:05, kant kodali  wrote:

> Hi All,
>
> How to unit test spark streaming or spark in general? How do I test the
> results of my transformations? Also, more importantly don't we need to
> spawn master and worker JVM's either in one or multiple nodes?
>
> Thanks!
> kant
>


Re: using spark to load a data warehouse in real time

2017-03-01 Thread Sam Elamin
Hi Adaryl

Having come from a Web background myself I completely understand your
confusion so let me try to clarify a few things

First and foremost, Spark is a data processing engine not a general
framework. In the Web applications and frameworks world you load the
entities, map them to the UI and serve them up to the users then save
whatever you need to back to the database via some sort of entity mapping.
Whether that's an orm or a stored procedures or any other manner

Spark as I mentioned is a data processing engine so there Is no concept of
an orm or data mapper. You can give it the schema of what you expect the
data to like like, it also works well with most of the data formats being
used in the industry like CSV,JSON,AVRO and PARQUET including infering the
schema from the data provided making it much easier to develop and maintain

Now as to your question of loading data in real time it absolutely can be
done. Traditionally data coming in arrives at a location most people call
the landing. This is where the extract of the etl part begins.

As Jorn mention spark streaming isn't meant to write to a database but you
can write to kafka or kinesis to write to a pipeline then have another
process call them and write to your end datastore.

 The creators of spark realised that you're use case is absolutely valid
and almost everyone they talked to said that streaming on its own wasn't
enough, for this very same reason the concept of structured streaming was
brought in place.

Se  this blog post from databricks

https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html


You can potentially use the structured streaming APIs to continually read
changes from hdfs or in your case S3 then write it out via jdbc to your end
datastore

I have done it before so I'll give you a few gotchas to be aware of

The most important one is that your end datastore or data warehouse
supports streaming inserts, some are better than others. Redshift
specifically is really bad when it comes to small very frequent deltas
which is what streaming at high scale is

The second is that the structured streaming is still in alpha phase and the
code is marked as experimental, that's not to say it will die the minute
you push any load through because I found that it handled Gbs of data well.
The pains I found is that the underlying goal of structured streaming was
to use the underlying dataframe APIs hence unifying the batch and stream
data types meaning you only need to learn one. However some methods don't
yet work on the streaming dataframes such as dropDuplicates


That's pretty much it. So really it comes down to you're use case, if you
need the data to be reliable and never go down then implement kafka or
Kinesis. If it's a proof of concept or you are trying to validate a theory
use structured streaming as it's much quicker to write, weeks and months of
set up vs a few hours


I hope I clarified things for you

Regards
Sam

Sent from my iPhone




On Wed, 1 Mar 2017 at 07:34, Jörn Franke  wrote:

I am not sure that Spark Streaming is what you want to do. It is for
streaming analytics not for loading in a DWH.

You need also define what realtime means and what is needed there - it will
differ from client to client significantly.

>From my experience, just SQL is not enough for the users in the future.
Especially large data volumes require much more beyond just aggregations.
These may become less useful in context of large data volumes. They have to
learn new ways of dealing with the data from a business perspective by
employing proper sampling of data from a large dataset, machine learning
approaches etc. These are new methods which are not technically driven but
business driven. I think it is wrong to assume that users learning new
skills is a bad thing; it might be in the future a necessity.

On 28 Feb 2017, at 23:18, Adaryl Wakefield 
wrote:

I’m actually trying to come up with a generalized use case that I can take
from client to client. We have structured data coming from some
application. Instead of dropping it into Hadoop and then using yet another
technology to query that data, I just want to dump it into a relational MPP
DW so nobody has to learn new skills or new tech just to do some analysis.
Everybody and their mom can write SQL. Designing relational databases is a
rare skill but not as rare as what is necessary for designing some NoSQL
solutions.



I’m looking for the fastest path to move a company from batch to real time
analytical processing.



Adaryl "Bob" Wakefield, MBA
Principal
Mass Street Analytics, LLC
913.938.6685

www.massstreet.net

www.linkedin.com/in/bobwakefieldmba
Twitter: @BobLovesData



*From:* Mohammad Tariq [mailto:donta...@gmail.com ]
*Sent:* Tuesday, February 28, 2017 12:57 PM
*To:* Adaryl Wakefield 
*Cc:* user@spark.apache.org
*Subject:* Re: using spark to load a data 

Re: Structured Streaming: How to handle bad input

2017-02-23 Thread Sam Elamin
Hi Jayesh

So you have 2 problems here

1) Data was loaded in the wrong format
2) Once you handled the wrong data the spark job will continually retry the
failed batch

For 2 its very easy to go into the checkpoint directory and delete that
offset manually and make it seem like it never happened.

However for point 1 the issue is a little bit more trickier, if you receive
bad data then perhaps your first point of call should be a cleaning process
to ensure your data is atleast parsable, then move it to another directory
which spark streaming is looking at

It is unreasonable to have spark both do the streaming and handle bad data
for you yet remain extremely simple and easy to use

That said I personally would have a conversation with the provider of the
data


In this scenario I just ensure that these providers ensure the format of
the data is correct, whether its CSV JSON AVRO PARQUET or whatever, I
should hope whatever service/company is providing this data is providing it
"correctly" to a set definition, otherwise you will have to do a pre
cleaning step


Perhaps someone else can suggest a better/cleaner approach

Regards
Sam







On Thu, Feb 23, 2017 at 2:09 PM, JayeshLalwani <
jayesh.lalw...@capitalone.com> wrote:

> What is a good way to make a Structured Streaming application deal with bad
> input? Right now, the problem is that bad input kills the Structured
> Streaming application. This is highly undesirable, because a Structured
> Streaming application has to be always on
>
> For example, here is a very simple structured streaming program
>
>
>
>
> Now, I drop in a CSV file with the following data into my bucket
>
>
>
> Obviously the data is in the wrong format
>
> The executor and driver come crashing down
> 17/02/23 08:53:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> 0)
> java.lang.NumberFormatException: For input string: "Iron man"
> at
> java.lang.NumberFormatException.forInputString(
> NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at scala.collection.immutable.StringLike$class.toInt(
> StringLike.scala:272)
> at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(
> CSVInferSchema.scala:250)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:125)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$
> csvParser$3.apply(CSVRelation.scala:94)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:167)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$
> buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:166)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.
> nextIterator(FileScanRDD.scala:166)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(
> FileScanRDD.scala:102)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:231)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 2.apply(SparkPlan.scala:225)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$25.apply(RDD.scala:826)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 

Re: quick question: best to use cluster mode or client mode for production?

2017-02-23 Thread Sam Elamin
I personally use spark submit as it's agnostic to which platform your spark
clusters are working on e.g. Emr dataproc databricks etc


On Thu, 23 Feb 2017 at 08:53, nancy henry  wrote:

> Hi Team,
>
> I have set of hc.sql("hivequery") kind of scripts which i am running right
> now in spark-shell
>
> How should i schedule it in production
> making it spark-shell -i script.scala
> or keeping it in jar file through eclipse and use spark-submit deploy mode
> cluster?
>
> which is advisable?
>


Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
just doing a bit of research, seems weve been beaten to the punch, theres
already a connector you can use here
<https://github.com/maropu/spark-kinesis-sql-asl/issues/4>

Give it a go and feel free to give the commiter feedback or better yet send
some PRs if it needs them :)

On Sun, Feb 19, 2017 at 9:23 PM, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Hey Neil
>
> No worries! Happy to help you write it if you want, just link me to the
> repo and we can write it together
>
> Would be fun!
>
>
> Regards
> Sam
> On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
> wrote:
>
>> Thanks for the advice Sam. I will look into implementing a structured
>> streaming connector.
>>
>> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>>
>> HI Niel,
>>
>> My advice would be to write a structured streaming connector. The new
>> structured streaming APIs were brought in to handle exactly the issues you
>> describe
>>
>> See this blog
>> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>>
>> There isnt a structured streaming connector as of yet, but you can easily
>> write one that uses the underlying batch methods to read/write to Kinesis
>>
>> Have a look at how I wrote my bigquery connector here
>> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
>> get a new connector to a highly used datasource/sink
>>
>> Hope that helps
>>
>> Regards
>> Sam
>>
>> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Thanks for your response Ayan.
>>
>> This could be an option. One complication I see with that approach is
>> that I do not want to miss any records that are between the data we have
>> batched to the data store and the checkpoint. I would still need a
>> mechanism for recording the sequence number of the last time the data was
>> batched, so I could start the streaming application after that sequence
>> number.
>>
>> A similar approach could be to batch our data periodically, recording the
>> last sequence number of the batch. Then, fetch data from Kinesis using the
>> low level API to read data from the latest sequence number of the batched
>> data up until the sequence number of the latest checkpoint from our spark
>> app. I could merge batched dataset and the dataset fetched from Kinesis’s
>> lower level API, and use that dataset as an RDD to prep the job.
>>
>> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>> Hi
>>
>> AFAIK, Kinesis does not provide any mechanism other than check point to
>> restart. That makes sense as it makes it so generic.
>>
>> Question: why cant you warm up your data from a data store? Say every 30
>> mins you run a job to aggregate your data to a data store for that hour.
>> When you restart the streaming app it would read from dynamo check point,
>> but it would also preps an initial rdd from data store?
>>
>> Best
>> Ayan
>> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
>> neil.v.maheshw...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>- read from the latest checkpointed sequence number in Dynamo
>>- start from the oldest record in the stream (TRIM_HORIZON shard
>>iterator type)
>>- start from the most recent record in the stream (LATEST shard
>>iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>- Create a KCL application that fetches the previous hour data and
>>writes it to HDFS. We can create an RDD from 

Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
Hey Neil

No worries! Happy to help you write it if you want, just link me to the
repo and we can write it together

Would be fun!


Regards
Sam
On Sun, 19 Feb 2017 at 21:21, Neil Maheshwari <neil.v.maheshw...@gmail.com>
wrote:

> Thanks for the advice Sam. I will look into implementing a structured
> streaming connector.
>
> On Feb 19, 2017, at 11:54 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> HI Niel,
>
> My advice would be to write a structured streaming connector. The new
> structured streaming APIs were brought in to handle exactly the issues you
> describe
>
> See this blog
> <https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html>
>
> There isnt a structured streaming connector as of yet, but you can easily
> write one that uses the underlying batch methods to read/write to Kinesis
>
> Have a look at how I wrote my bigquery connector here
> <http://github.com/samelamin/spark-bigquery>. Plus the best thing is we
> get a new connector to a highly used datasource/sink
>
> Hope that helps
>
> Regards
> Sam
>
> On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha <guha.a...@gmail.com> wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
> Hello,
>
> I am building a Spark streaming application that ingests data from an
> Amazon Kinesis stream. My application keeps track of the minimum price over
> a window for groups of similar tickets. When I deploy the application, I
> would like it to start processing at the start of the previous hours data.
> This will warm up the state of the application and allow us to deploy our
> application faster. For example, if I start the application at 3 PM, I
> would like to process the data retained by Kinesis from 2PM to 3PM, and
> then continue receiving data going forward. Spark Streaming’s Kinesis
> receiver, which relies on the Amazon Kinesis Client Library, seems to give
> me three options for choosing where to read from the stream:
>
>- read from the latest checkpointed sequence number in Dynamo
>- start from the oldest record in the stream (TRIM_HORIZON shard
>iterator type)
>- start from the most recent record in the stream (LATEST shard
>iterator type)
>
>
> Do you have any suggestions on how we could start our application at a
> specific timestamp or sequence number in the Kinesis stream? Some ideas I
> had were:
>
>- Create a KCL application that fetches the previous hour data and
>writes it to HDFS. We can create an RDD from that dataset and initialize
>our Spark Streaming job with it. The spark streaming job’s Kinesis receiver
>can have the same name as the initial KCL application, and use that
>applications checkpoint as the starting point. We’re writing our spark jobs
>in Python, so this would require launching the java MultiLang daemon, or
>writing that portion of the application in Java/Scala.
>- Before the Spark streaming application starts, we could fetch a
>shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>the sequence number of the first record returned by this iterator, and
>create an entry in Dynamo for our application for that sequence number. Our
>Kinesis receiver would pick up from this checkpoint. It makes me a little
>nervous that we would be faking Kinesis Client Library's protocol by
>writing a checkpoint into Dynamo
>
>
> Thanks in advance!
>
> Neil
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>


Re: [Spark Streaming] Starting Spark Streaming application from a specific position in Kinesis stream

2017-02-19 Thread Sam Elamin
HI Niel,

My advice would be to write a structured streaming connector. The new
structured streaming APIs were brought in to handle exactly the issues you
describe

See this blog


There isnt a structured streaming connector as of yet, but you can easily
write one that uses the underlying batch methods to read/write to Kinesis

Have a look at how I wrote my bigquery connector here
. Plus the best thing is we get
a new connector to a highly used datasource/sink

Hope that helps

Regards
Sam

On Sun, Feb 19, 2017 at 5:53 PM, Neil Maheshwari <
neil.v.maheshw...@gmail.com> wrote:

> Thanks for your response Ayan.
>
> This could be an option. One complication I see with that approach is that
> I do not want to miss any records that are between the data we have batched
> to the data store and the checkpoint. I would still need a mechanism for
> recording the sequence number of the last time the data was batched, so I
> could start the streaming application after that sequence number.
>
> A similar approach could be to batch our data periodically, recording the
> last sequence number of the batch. Then, fetch data from Kinesis using the
> low level API to read data from the latest sequence number of the batched
> data up until the sequence number of the latest checkpoint from our spark
> app. I could merge batched dataset and the dataset fetched from Kinesis’s
> lower level API, and use that dataset as an RDD to prep the job.
>
> On Feb 19, 2017, at 3:12 AM, ayan guha  wrote:
>
> Hi
>
> AFAIK, Kinesis does not provide any mechanism other than check point to
> restart. That makes sense as it makes it so generic.
>
> Question: why cant you warm up your data from a data store? Say every 30
> mins you run a job to aggregate your data to a data store for that hour.
> When you restart the streaming app it would read from dynamo check point,
> but it would also preps an initial rdd from data store?
>
> Best
> Ayan
> On Sun, 19 Feb 2017 at 8:29 pm, Neil Maheshwari <
> neil.v.maheshw...@gmail.com> wrote:
>
>> Hello,
>>
>> I am building a Spark streaming application that ingests data from an
>> Amazon Kinesis stream. My application keeps track of the minimum price over
>> a window for groups of similar tickets. When I deploy the application, I
>> would like it to start processing at the start of the previous hours data.
>> This will warm up the state of the application and allow us to deploy our
>> application faster. For example, if I start the application at 3 PM, I
>> would like to process the data retained by Kinesis from 2PM to 3PM, and
>> then continue receiving data going forward. Spark Streaming’s Kinesis
>> receiver, which relies on the Amazon Kinesis Client Library, seems to give
>> me three options for choosing where to read from the stream:
>>
>>- read from the latest checkpointed sequence number in Dynamo
>>- start from the oldest record in the stream (TRIM_HORIZON shard
>>iterator type)
>>- start from the most recent record in the stream (LATEST shard
>>iterator type)
>>
>>
>> Do you have any suggestions on how we could start our application at a
>> specific timestamp or sequence number in the Kinesis stream? Some ideas I
>> had were:
>>
>>- Create a KCL application that fetches the previous hour data and
>>writes it to HDFS. We can create an RDD from that dataset and initialize
>>our Spark Streaming job with it. The spark streaming job’s Kinesis 
>> receiver
>>can have the same name as the initial KCL application, and use that
>>applications checkpoint as the starting point. We’re writing our spark 
>> jobs
>>in Python, so this would require launching the java MultiLang daemon, or
>>writing that portion of the application in Java/Scala.
>>- Before the Spark streaming application starts, we could fetch a
>>shard iterator using the AT_TIMESTAMP shard iterator type. We could record
>>the sequence number of the first record returned by this iterator, and
>>create an entry in Dynamo for our application for that sequence number. 
>> Our
>>Kinesis receiver would pick up from this checkpoint. It makes me a little
>>nervous that we would be faking Kinesis Client Library's protocol by
>>writing a checkpoint into Dynamo
>>
>>
>> Thanks in advance!
>>
>> Neil
>>
> --
> Best Regards,
> Ayan Guha
>
>
>


Re: Debugging Spark application

2017-02-16 Thread Sam Elamin
I recommend running spark in local mode when your first debugging your code
just to understand what's happening and step through it, perhaps catch a
few errors when you first start off

I personally use intellij because it's my preference You can follow this
guide.
http://www.bigendiandata.com/2016-08-26-How-to-debug-remote-spark-jobs-with-IntelliJ/

Although it's for intellij you can apply the same concepts to eclipse *I
think*


Regards
Sam


On Thu, 16 Feb 2017 at 22:00, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi,
>
> I was looking for some URLs/documents for getting started on debugging
> Spark applications.
>
> I prefer developing Spark applications with Scala on Eclipse and then
> package the application jar before submitting.
>
>
>
> Kind regards,
> Reza
>
>
>
>


Re: Enrichment with static tables

2017-02-15 Thread Sam Elamin
You can do a join or a union to combine all the dataframes to one fat
dataframe

or do a select on the columns you want to produce your transformed dataframe

Not sure if I understand the question though, If the goal is just an end
state transformed dataframe that can easily be done


Regards
Sam

On Wed, Feb 15, 2017 at 6:34 PM, Gaurav Agarwal 
wrote:

> Hello
>
> We want to enrich our spark RDD loaded with multiple Columns and multiple
> Rows . This need to be enriched with 3 different tables that i loaded 3
> different spark dataframe . Can we write some logic in spark so i can
> enrich my spark RDD with different stattic tables.
>
> Thanks
>
>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
ah if thats the case then you might need to define the schema before hand.
Either that or if you want to infer it then ensure a jsonfile exists with
the right schema so spark infers the right columns

essentially making both files one dataframe if that makes sense

On Tue, Feb 14, 2017 at 3:04 PM, Aseem Bansal <asmbans...@gmail.com> wrote:

> Sorry if I trivialized the example. It is the same kind of file and
> sometimes it could have "a", sometimes "b", sometimes both. I just don't
> know. That is what I meant by missing columns.
>
> It would be good if I read any of the JSON and if I do spark sql and it
> gave me
>
> for json1.json
>
> a | b
> 1 | null
>
> for json2.json
>
> a | b
> null | 2
>
>
> On Tue, Feb 14, 2017 at 8:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> I may be missing something super obvious here but can't you combine them
>> into a single dataframe. Left join perhaps?
>>
>> Try writing it in sql " select a from json1 and b from josn2"then run
>> explain to give you a hint to how to do it in code
>>
>> Regards
>> Sam
>> On Tue, 14 Feb 2017 at 14:30, Aseem Bansal <asmbans...@gmail.com> wrote:
>>
>>> Say I have two files containing single rows
>>>
>>> json1.json
>>>
>>> {"a": 1}
>>>
>>> json2.json
>>>
>>> {"b": 2}
>>>
>>> I read in this json file using spark's API into a dataframe one at a
>>> time. So I have
>>>
>>> Dataset json1DF
>>> and
>>> Dataset json2DF
>>>
>>> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
>>> an exception as for json1DF does not have "b" and json2DF does not have "a"
>>>
>>> How could I handle this situation with missing columns in JSON?
>>>
>>
>


Re: Dealing with missing columns in SPARK SQL in JSON

2017-02-14 Thread Sam Elamin
I may be missing something super obvious here but can't you combine them
into a single dataframe. Left join perhaps?

Try writing it in sql " select a from json1 and b from josn2"then run
explain to give you a hint to how to do it in code

Regards
Sam
On Tue, 14 Feb 2017 at 14:30, Aseem Bansal  wrote:

> Say I have two files containing single rows
>
> json1.json
>
> {"a": 1}
>
> json2.json
>
> {"b": 2}
>
> I read in this json file using spark's API into a dataframe one at a time.
> So I have
>
> Dataset json1DF
> and
> Dataset json2DF
>
> If I run "select a, b from __THIS__" in a SQLTransformer then I will get
> an exception as for json1DF does not have "b" and json2DF does not have "a"
>
> How could I handle this situation with missing columns in JSON?
>


Re: how to fix the order of data

2017-02-14 Thread Sam Elamin
Its because you are just printing on the rdd

You can sort the df like below

 input.toDF().sort().collect()


or if you do not want to convert to a dataframe you can use the sort by
*sortByKey*([*ascending*], [*numTasks*])


Regards

Sam





On Tue, Feb 14, 2017 at 11:41 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:

>HI  all,
> the belowing is my test code. I found the output of val
> input is different. how do i fix the order please?
>
> scala> val input = sc.parallelize( Array(1,2,3))
> input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at
> parallelize at :24
>
> scala> input.foreach(print)
> 132
> scala> input.foreach(print)
> 213
> scala> input.foreach(print)
> 312


Re: Etl with spark

2017-02-12 Thread Sam Elamin
Yup I ended up doing just that thank you both
On Sun, 12 Feb 2017 at 18:33, Miguel Morales <therevolti...@gmail.com>
wrote:

> You can parallelize the collection of s3 keys and then pass that to your
> map function so that files are read in parallel.
>
> Sent from my iPhone
>
> On Feb 12, 2017, at 9:41 AM, Sam Elamin <hussam.ela...@gmail.com> wrote:
>
> thanks Ayan but i was hoping to remove the dependency on a file and just
> use in memory list or dictionary
>
> So from the reading I've done today it seems.the concept of a bespoke
> async method doesn't really apply in spsrk since the cluster deals with
> distributing the work load
>
>
> Am I mistaken?
>
> Regards
> Sam
> On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote:
>
> You can store the list of keys (I believe you use them in source file
> path, right?) in a file, one key per line. Then you can read the file using
> sc.textFile (So you will get a RDD of file paths) and then apply your
> function as a map.
>
> r = sc.textFile(list_file).map(your_function)
>
> HTH
>
> On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hey folks
>
> Really simple question here. I currently have an etl pipeline that reads
> from s3 and saves the data to an endstore
>
>
> I have to read from a list of keys in s3 but I am doing a raw extract then
> saving. Only some of the extracts have a simple transformation but overall
> the code looks the same
>
>
> I abstracted away this logic into a method that takes in an s3 path does
> the common transformations and saves to source
>
>
> But the job takes about 10 mins or so because I'm iteratively going down a
> list of keys
>
> Is it possible to asynchronously do this?
>
> FYI I'm using spark.read.json to read from s3 because it infers my schema
>
> Regards
> Sam
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Etl with spark

2017-02-12 Thread Sam Elamin
thanks Ayan but i was hoping to remove the dependency on a file and just
use in memory list or dictionary

So from the reading I've done today it seems.the concept of a bespoke async
method doesn't really apply in spsrk since the cluster deals with
distributing the work load


Am I mistaken?

Regards
Sam
On Sun, 12 Feb 2017 at 12:13, ayan guha <guha.a...@gmail.com> wrote:

You can store the list of keys (I believe you use them in source file path,
right?) in a file, one key per line. Then you can read the file using
sc.textFile (So you will get a RDD of file paths) and then apply your
function as a map.

r = sc.textFile(list_file).map(your_function)

HTH

On Sun, Feb 12, 2017 at 10:04 PM, Sam Elamin <hussam.ela...@gmail.com>
wrote:

Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam




-- 
Best Regards,
Ayan Guha


Etl with spark

2017-02-12 Thread Sam Elamin
Hey folks

Really simple question here. I currently have an etl pipeline that reads
from s3 and saves the data to an endstore


I have to read from a list of keys in s3 but I am doing a raw extract then
saving. Only some of the extracts have a simple transformation but overall
the code looks the same


I abstracted away this logic into a method that takes in an s3 path does
the common transformations and saves to source


But the job takes about 10 mins or so because I'm iteratively going down a
list of keys

Is it possible to asynchronously do this?

FYI I'm using spark.read.json to read from s3 because it infers my schema

Regards
Sam


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Here's a link to the thread

http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-Dropping-Duplicates-td20884.html
On Sat, 11 Feb 2017 at 08:47, Sam Elamin <hussam.ela...@gmail.com> wrote:

> Hey Egor
>
>
> You can use for each writer or you can write a custom sink. I personally
> went with a custom sink since I get a dataframe per batch
>
>
> https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala
>
> You can have a look at how I implemented something similar to file sink
> that in the event if a failure skips batches already written
>
>
> Also have a look at Micheals reply to me a few days ago on exactly the
> same topic. The email subject was called structured streaming. Dropping
> duplicates
>
>
> Regards
>
> Sam
>
> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski <ja...@japila.pl> wrote:
>
> "Something like that" I've never tried it out myself so I'm only
> guessing having a brief look at the API.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov <pahomov.e...@gmail.com>
> wrote:
> > Jacek, so I create cache in ForeachWriter, in all "process()" I write to
> it
> > and on close I flush? Something like that?
> >
> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
> >>
> >> Hi,
> >>
> >> Yes, that's ForeachWriter.
> >>
> >> Yes, it works with element by element. You're looking for mapPartition
> >> and ForeachWriter has partitionId that you could use to implement a
> >> similar thing.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov <pahomov.e...@gmail.com>
> >> wrote:
> >> > Jacek, you mean
> >> >
> >> >
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
> >> > ? I do not understand how to use it, since it passes every value
> >> > separately,
> >> > not every partition. And addding to table value by value would not
> work
> >> >
> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski <ja...@japila.pl>:
> >> >>
> >> >> Hi,
> >> >>
> >> >> Have you considered foreach sink?
> >> >>
> >> >> Jacek
> >> >>
> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" <pahomov.e...@gmail.com>
> wrote:
> >> >>>
> >> >>> Hi, I'm thinking of using Structured Streaming instead of old
> >> >>> streaming,
> >> >>> but I need to be able to save results to Hive table. Documentation
> for
> >> >>> file
> >> >>> sink
> >> >>>
> >> >>> says(
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
> ):
> >> >>> "Supports writes to partitioned tables. ". But being able to write
> to
> >> >>> partitioned directories is not enough to write to the table: someone
> >> >>> needs
> >> >>> to write to Hive metastore. How can I use Structured Streaming and
> >> >>> write to
> >> >>> Hive table?
> >> >>>
> >> >>> --
> >> >>> Sincerely yours
> >> >>> Egor Pakhomov
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sincerely yours
> >> > Egor Pakhomov
> >
> >
> >
> >
> > --
> > Sincerely yours
> > Egor Pakhomov
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Hey Egor


You can use for each writer or you can write a custom sink. I personally
went with a custom sink since I get a dataframe per batch

https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala

You can have a look at how I implemented something similar to file sink
that in the event if a failure skips batches already written


Also have a look at Micheals reply to me a few days ago on exactly the same
topic. The email subject was called structured streaming. Dropping
duplicates


Regards

Sam

On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski  wrote:

"Something like that" I've never tried it out myself so I'm only
guessing having a brief look at the API.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov 
wrote:
> Jacek, so I create cache in ForeachWriter, in all "process()" I write to
it
> and on close I flush? Something like that?
>
> 2017-02-09 12:42 GMT-08:00 Jacek Laskowski :
>>
>> Hi,
>>
>> Yes, that's ForeachWriter.
>>
>> Yes, it works with element by element. You're looking for mapPartition
>> and ForeachWriter has partitionId that you could use to implement a
>> similar thing.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov 
>> wrote:
>> > Jacek, you mean
>> >
>> >
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
>> > ? I do not understand how to use it, since it passes every value
>> > separately,
>> > not every partition. And addding to table value by value would not work
>> >
>> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski :
>> >>
>> >> Hi,
>> >>
>> >> Have you considered foreach sink?
>> >>
>> >> Jacek
>> >>
>> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" 
wrote:
>> >>>
>> >>> Hi, I'm thinking of using Structured Streaming instead of old
>> >>> streaming,
>> >>> but I need to be able to save results to Hive table. Documentation
for
>> >>> file
>> >>> sink
>> >>>
>> >>> says(
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
):
>> >>> "Supports writes to partitioned tables. ". But being able to write to
>> >>> partitioned directories is not enough to write to the table: someone
>> >>> needs
>> >>> to write to Hive metastore. How can I use Structured Streaming and
>> >>> write to
>> >>> Hive table?
>> >>>
>> >>> --
>> >>> Sincerely yours
>> >>> Egor Pakhomov
>> >
>> >
>> >
>> >
>> > --
>> > Sincerely yours
>> > Egor Pakhomov
>
>
>
>
> --
> Sincerely yours
> Egor Pakhomov

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Structured Streaming. S3 To Google BigQuery

2017-02-08 Thread Sam Elamin
Hi All

Thank you all for the amazing support! I have written a BigQuery connector
for structured streaming that you can find here


I just tweeted 
about it and would really appreciated it if you retweeted when you get a
chance

The more people know about it and use it the more feedback I can get to
make the connector better!

Ofcourse PRs and feedback are always welcome :)

Thanks again!

Regards
Sam


Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Ah ok


Thanks for clearing it up Ayan! i will give that a go



Thank you all for your help, this mailing list is awesome!

On Mon, Feb 6, 2017 at 9:07 AM, ayan guha <guha.a...@gmail.com> wrote:

> If I am not missing anything here, "So I know which columns are numeric
> and which arent because I have a StructType and all the internal
> StructFields will tell me which ones have a DataType which is numeric and
> which arent" will lead to getting to a list of fields which should be
> numeric.
>
> Essentially, You will create a list of numeric fields from your
> "should-be" struct type. Then you will load your raw data using built-in
> json reader. At this point, your data have a wrong schema. Now, you will
> need to correct it. How? You will loop over the list of numeric fields (or,
> you can do it directly on the struct type), and try to match the type. If
> you find a mismatch, you'd add a withColumn clause to cast to the correct
> data type (from your "should-be" struct).
>
> HTH?
>
> Best
> Ayan
>
> On Mon, Feb 6, 2017 at 8:00 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Yup sorry I should have explained myself better
>>
>> So I know which columns are numeric and which arent because I have a
>> StructType and all the internal StructFields will tell me which ones have a
>> DataType which is numeric and which arent
>>
>> So assuming I have a json string which has double quotes on numbers when
>> it shouldnt, and I have the correct schema in a struct type
>>
>>
>> how can I iterate over them to programatically create the new dataframe
>> in the correct format
>>
>> do i iterate over the columns in the StructType? or iterate over the
>> columns in the dataframe and try to match them with the StructType?
>>
>> I hope I cleared things up, What I wouldnt do for a drawing board right
>> now!
>>
>>
>> On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> UmmI think the premise is you need to "know" beforehand which
>>> columns are numeric.Unless you know it, how would you apply the schema?
>>>
>>> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>>> Thanks ayan but I meant how to derive the list automatically
>>>>
>>>> In your example you are specifying the numeric columns and I would like
>>>> it to be applied to any schema if that makes sense
>>>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>>>>
>>>>> SImple (pyspark) example:
>>>>>
>>>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>>>>> >>> df.printSchema()
>>>>> root
>>>>>  |-- customerid: string (nullable = true)
>>>>>  |-- foo: string (nullable = true)
>>>>>
>>>>> >>> numeric_field_list = ['customerid']
>>>>>
>>>>> >>> for k in numeric_field_list:
>>>>> ... df = df.withColumn(k,df[k].cast("long"))
>>>>> ...
>>>>> >>> df.printSchema()
>>>>> root
>>>>>  |-- customerid: long (nullable = true)
>>>>>  |-- foo: string (nullable = true)
>>>>>
>>>>>
>>>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Ok thanks Micheal!
>>>>>
>>>>>
>>>>> Can I get an idea on where to start? Assuming I have the end schema
>>>>> and the current dataframe...
>>>>> How can I loop through it and create a new dataframe using the
>>>>> WithColumn?
>>>>>
>>>>>
>>>>> Am I iterating through the dataframe or the schema?
>>>>>
>>>>> I'm assuming it's easier to iterate through the columns in the old df.
>>>>> For each column cast it correctly and generate a new df?
>>>>>
>>>>>
>>>>> Would you recommend that?
>>>>>
>>>>> Regards
>>>>> Sam
>>>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>>>>> wrote:
>>>>>
>>>>> If you already have the expected schema, and you know that all numbers
>>>>> will always be formatted as strings in the input JSON, you could probably

Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Yup sorry I should have explained myself better

So I know which columns are numeric and which arent because I have a
StructType and all the internal StructFields will tell me which ones have a
DataType which is numeric and which arent

So assuming I have a json string which has double quotes on numbers when it
shouldnt, and I have the correct schema in a struct type


how can I iterate over them to programatically create the new dataframe in
the correct format

do i iterate over the columns in the StructType? or iterate over the
columns in the dataframe and try to match them with the StructType?

I hope I cleared things up, What I wouldnt do for a drawing board right
now!


On Mon, Feb 6, 2017 at 8:56 AM, ayan guha <guha.a...@gmail.com> wrote:

> UmmI think the premise is you need to "know" beforehand which columns
> are numeric.Unless you know it, how would you apply the schema?
>
> On Mon, Feb 6, 2017 at 7:54 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Thanks ayan but I meant how to derive the list automatically
>>
>> In your example you are specifying the numeric columns and I would like
>> it to be applied to any schema if that makes sense
>> On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> SImple (pyspark) example:
>>>
>>> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
>>> >>> df.printSchema()
>>> root
>>>  |-- customerid: string (nullable = true)
>>>  |-- foo: string (nullable = true)
>>>
>>> >>> numeric_field_list = ['customerid']
>>>
>>> >>> for k in numeric_field_list:
>>> ... df = df.withColumn(k,df[k].cast("long"))
>>> ...
>>> >>> df.printSchema()
>>> root
>>>  |-- customerid: long (nullable = true)
>>>  |-- foo: string (nullable = true)
>>>
>>>
>>> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Ok thanks Micheal!
>>>
>>>
>>> Can I get an idea on where to start? Assuming I have the end schema and
>>> the current dataframe...
>>> How can I loop through it and create a new dataframe using the
>>> WithColumn?
>>>
>>>
>>> Am I iterating through the dataframe or the schema?
>>>
>>> I'm assuming it's easier to iterate through the columns in the old df.
>>> For each column cast it correctly and generate a new df?
>>>
>>>
>>> Would you recommend that?
>>>
>>> Regards
>>> Sam
>>> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
>>> wrote:
>>>
>>> If you already have the expected schema, and you know that all numbers
>>> will always be formatted as strings in the input JSON, you could probably
>>> derive this list automatically.
>>>
>>> Wouldn't it be simpler to just regex replace the numbers to remove the
>>> quotes?
>>>
>>>
>>> I think this is likely to be a slower and less robust solution.  You
>>> would have to make sure that you got all the corner cases right (i.e.
>>> escaping and what not).
>>>
>>> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> I see so for the connector I need to pass in an array/list of numerical
>>> columns?
>>>
>>> Wouldnt it be simpler to just regex replace the numbers to remove the
>>> quotes?
>>>
>>>
>>> Regards
>>> Sam
>>>
>>> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>> Specifying the schema when parsing JSON will only let you pick between
>>> similar datatypes (i.e should this be a short, long float, double etc).  It
>>> will not let you perform conversions like string <-> number.  This has to
>>> be done with explicit casts after the data has been loaded.
>>>
>>> I think you can make a solution that uses select or withColumn generic.
>>> Just load the dataframe with a "parse schema" that treats numbers as
>>> strings.  Then construct a list of columns that should be numbers and apply
>>> the necessary conversions.
>>>
>>> import org.apache.spark.sql.functions.col
>>> var df = spark.read.schema(parseSchema).json("...")
>>> numericColumns.foreach { columnName =>
>>>   df = df.withColumn(

Re: specifing schema on dataframe

2017-02-06 Thread Sam Elamin
Thanks ayan but I meant how to derive the list automatically

In your example you are specifying the numeric columns and I would like it
to be applied to any schema if that makes sense
On Mon, 6 Feb 2017 at 08:49, ayan guha <guha.a...@gmail.com> wrote:

> SImple (pyspark) example:
>
> >>> df = sqlContext.read.json("/user/l_aguha/spark_qs.json")
> >>> df.printSchema()
> root
>  |-- customerid: string (nullable = true)
>  |-- foo: string (nullable = true)
>
> >>> numeric_field_list = ['customerid']
>
> >>> for k in numeric_field_list:
> ... df = df.withColumn(k,df[k].cast("long"))
> ...
> >>> df.printSchema()
> root
>  |-- customerid: long (nullable = true)
>  |-- foo: string (nullable = true)
>
>
> On Mon, Feb 6, 2017 at 6:56 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Ok thanks Micheal!
>
>
> Can I get an idea on where to start? Assuming I have the end schema and
> the current dataframe...
> How can I loop through it and create a new dataframe using the WithColumn?
>
>
> Am I iterating through the dataframe or the schema?
>
> I'm assuming it's easier to iterate through the columns in the old df. For
> each column cast it correctly and generate a new df?
>
>
> Would you recommend that?
>
> Regards
> Sam
> On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> If you already have the expected schema, and you know that all numbers
> will always be formatted as strings in the input JSON, you could probably
> derive this list automatically.
>
> Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> I think this is likely to be a slower and less robust solution.  You would
> have to make sure that you got all the corner cases right (i.e. escaping
> and what not).
>
> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
>
> StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>

Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
Ok thanks Micheal!


Can I get an idea on where to start? Assuming I have the end schema and the
current dataframe...
How can I loop through it and create a new dataframe using the WithColumn?


Am I iterating through the dataframe or the schema?

I'm assuming it's easier to iterate through the columns in the old df. For
each column cast it correctly and generate a new df?


Would you recommend that?

Regards
Sam
On Mon, 6 Feb 2017 at 01:12, Michael Armbrust <mich...@databricks.com>
wrote:

> If you already have the expected schema, and you know that all numbers
> will always be formatted as strings in the input JSON, you could probably
> derive this list automatically.
>
> Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> I think this is likely to be a slower and less robust solution.  You would
> have to make sure that you got all the corner cases right (i.e. escaping
> and what not).
>
> On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
>
> StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
> wrote:
>
> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>
> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
> Hi All
>
> I would like to specify a schema when reading fro

Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
I see so for the connector I need to pass in an array/list of numerical
columns?

Wouldnt it be simpler to just regex replace the numbers to remove the
quotes?


Regards
Sam

On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> Specifying the schema when parsing JSON will only let you pick between
> similar datatypes (i.e should this be a short, long float, double etc).  It
> will not let you perform conversions like string <-> number.  This has to
> be done with explicit casts after the data has been loaded.
>
> I think you can make a solution that uses select or withColumn generic.
> Just load the dataframe with a "parse schema" that treats numbers as
> strings.  Then construct a list of columns that should be numbers and apply
> the necessary conversions.
>
> import org.apache.spark.sql.functions.col
> var df = spark.read.schema(parseSchema).json("...")
> numericColumns.foreach { columnName =>
>   df = df.withColumn(columnName, col(columnName).cast("long"))
> }
>
>
>
> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
>> Thanks Micheal
>>
>> I've been spending the past few days researching this
>>
>> The problem is the generated json has double quotes on fields that are
>> numbers because the producing datastore doesn't want to lose precision
>>
>> I can change the data type true but that would be on specific to a job
>> rather than a generic streaming job. I'm writing a structured streaming
>> connector and I have the schema the generated dataframe should match.
>>
>> Unfortunately using withColumn won't help me here since the solution
>> needs to be generic
>>
>> To summarise assume I have the following json
>>
>> [{
>> "customerid": "535137",
>> "foo": "bar"
>> }]
>>
>>
>> and I know the schema should be:
>> StructType(Array(StructField("customerid",LongType,true),Str
>> uctField("foo",StringType,true)))
>>
>> Whats the best way of solving this?
>>
>> My current approach is to iterate over the JSON and identify which fields
>> are numbers and which arent then recreate the json
>>
>> But to be honest that doesnt seem like the cleanest approach, so happy
>> for advice on this
>>
>> Regards
>> Sam
>>
>> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> -dev
>>>
>>> You can use withColumn to change the type after the data has been loaded
>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>>> .
>>>
>>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
>>> wrote:
>>>
>>> Hi Direceu
>>>
>>> Thanks your right! that did work
>>>
>>>
>>> But now im facing an even bigger problem since i dont have access to
>>> change the underlying data, I just want to apply a schema over something
>>> that was written via the sparkContext.newAPIHadoopRDD
>>>
>>> Basically I am reading in a RDD[JsonObject] and would like to convert it
>>> into a dataframe which I pass the schema into
>>>
>>> Whats the best way to do this?
>>>
>>> I doubt removing all the quotes in the JSON is the best solution is it?
>>>
>>> Regards
>>> Sam
>>>
>>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>>> dirceu.semigh...@gmail.com> wrote:
>>>
>>> Hi Sam
>>> Remove the " from the number that it will work
>>>
>>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
>>> escreveu:
>>>
>>> Hi All
>>>
>>> I would like to specify a schema when reading from a json but when
>>> trying to map a number to a Double it fails, I tried FloatType and IntType
>>> with no joy!
>>>
>>>
>>> When inferring the schema customer id is set to String, and I would like
>>> to cast it as Double
>>>
>>> so df1 is corrupted while df2 shows
>>>
>>>
>>> Also FYI I need this to be generic as I would like to apply it to any
>>> json, I specified the below schema as an example of the issue I am facing
>>>
>>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>>> DoubleType,FloatType, StructType, LongType,DecimalType}
>>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>>> val df1 = 
>>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> val df2 = 
>>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> df1.show(1)
>>> df2.show(1)
>>>
>>>
>>> Any help would be appreciated, I am sure I am missing something obvious
>>> but for the life of me I cant tell what it is!
>>>
>>>
>>> Kind Regards
>>> Sam
>>>
>>>
>>>
>>>
>


Re: specifing schema on dataframe

2017-02-05 Thread Sam Elamin
Thanks Micheal

I've been spending the past few days researching this

The problem is the generated json has double quotes on fields that are
numbers because the producing datastore doesn't want to lose precision

I can change the data type true but that would be on specific to a job
rather than a generic streaming job. I'm writing a structured streaming
connector and I have the schema the generated dataframe should match.

Unfortunately using withColumn won't help me here since the solution needs
to be generic

To summarise assume I have the following json

[{
"customerid": "535137",
"foo": "bar"
}]


and I know the schema should be:
StructType(Array(StructField("customerid",LongType,true),StructField("foo",StringType,true)))

Whats the best way of solving this?

My current approach is to iterate over the JSON and identify which fields
are numbers and which arent then recreate the json

But to be honest that doesnt seem like the cleanest approach, so happy for
advice on this

Regards
Sam

On Sun, 5 Feb 2017 at 22:00, Michael Armbrust <mich...@databricks.com>
wrote:

> -dev
>
> You can use withColumn to change the type after the data has been loaded
> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
> .
>
> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin <hussam.ela...@gmail.com>
> wrote:
>
> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
> Hi All
>
> I would like to specify a schema when reading from a json but when trying
> to map a number to a Double it fails, I tried FloatType and IntType with no
> joy!
>
>
> When inferring the schema customer id is set to String, and I would like
> to cast it as Double
>
> so df1 is corrupted while df2 shows
>
>
> Also FYI I need this to be generic as I would like to apply it to any
> json, I specified the below schema as an example of the issue I am facing
>
> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
> DoubleType,FloatType, StructType, LongType,DecimalType}
> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
> val df1 = 
> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
> val df2 = 
> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
> df1.show(1)
> df2.show(1)
>
>
> Any help would be appreciated, I am sure I am missing something obvious
> but for the life of me I cant tell what it is!
>
>
> Kind Regards
> Sam
>
>
>
>


Re: specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi Direceu

Thanks your right! that did work


But now im facing an even bigger problem since i dont have access to change
the underlying data, I just want to apply a schema over something that was
written via the sparkContext.newAPIHadoopRDD

Basically I am reading in a RDD[JsonObject] and would like to convert it
into a dataframe which I pass the schema into

Whats the best way to do this?

I doubt removing all the quotes in the JSON is the best solution is it?

Regards
Sam

On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Sam
> Remove the " from the number that it will work
>
> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" <hussam.ela...@gmail.com>
> escreveu:
>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>


specifing schema on dataframe

2017-02-04 Thread Sam Elamin
Hi All

I would like to specify a schema when reading from a json but when trying
to map a number to a Double it fails, I tried FloatType and IntType with no
joy!


When inferring the schema customer id is set to String, and I would like to
cast it as Double

so df1 is corrupted while df2 shows


Also FYI I need this to be generic as I would like to apply it to any json,
I specified the below schema as an example of the issue I am facing

import org.apache.spark.sql.types.{BinaryType, StringType,
StructField, DoubleType,FloatType, StructType, LongType,DecimalType}
val testSchema = StructType(Array(StructField("customerid",DoubleType)))
val df1 = 
spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
val df2 = spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
df1.show(1)
df2.show(1)


Any help would be appreciated, I am sure I am missing something obvious but
for the life of me I cant tell what it is!


Kind Regards
Sam


Re: java.lang.NoSuchMethodError: scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef

2017-02-04 Thread Sam Elamin
Hi sathyanarayanan


zero() on scala.runtime.VolatileObjectRef has been introduced in Scala 2.11
You probably have a library compiled against Scala 2.11 and running on a
Scala 2.10 runtime.

See

v2.10:
https://github.com/scala/scala/blob/2.10.x/src/library/scala/runtime/VolatileObjectRef.java
v2.11:
https://github.com/scala/scala/blob/2.11.x/src/library/scala/runtime/VolatileObjectRef.java

Regards
Sam

On Sat, 4 Feb 2017 at 09:24, sathyanarayanan mudhaliyar <
sathyanarayananmudhali...@gmail.com> wrote:

> Hi ,
> I got the error below when executed
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef;
>
> error in detail:
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.runtime.ObjectRef.zero()Lscala/runtime/ObjectRef;
> at
> com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
> at
> com.datastax.spark.connector.cql.CassandraConnector$$anonfun$3.apply(CassandraConnector.scala:149)
> at
> com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
> at
> com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
> at
> com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:82)
> at com.nwf.Consumer.main(Consumer.java:63)
>
> code :
>
> Consumer consumer = new Consumer();
> SparkConf conf = new
> SparkConf().setAppName("kafka-sandbox").setMaster("local[2]");
> conf.set("spark.cassandra.connection.host", "localhost"); //connection
> for cassandra database
> JavaSparkContext sc = new JavaSparkContext(conf);
> CassandraConnector connector = CassandraConnector.apply(sc.getConf());
> final Session session = connector.openSession();
> final PreparedStatement prepared = session.prepare("INSERT INTO
> spark_test5.messages JSON?");
>
>
> The error is in the line which is in green color.
> Thank you guys.
>
>