Re: Querying on Deeply Nested JSON Structures

2017-07-15 Thread Matt Deaver
I would love to be told otherwise, but I believe your options are to either
1) use the explode function or 2) pre-process the data so you don't have to
explode it.

On Jul 15, 2017 11:41 AM, "Patrick"  wrote:

> Hi,
>
> We need to query deeply nested Json structure. However query is on a
> single field at a nested level such as mean, median, mode.
>
> I am aware of the sql explode function.
>
> df = df_nested.withColumn('exploded', explode(top))
>
> But this is too slow.
>
> Is there any other strategy that could give us the best performance in 
> querying nested json in Spark Dataset.
>
>
> Thanks
>
>
>


Re: Python Spark for full fledged ETL

2017-06-29 Thread Matt Deaver
While you could do this in Spark it stinks of over-engineering. An ETL tool
would be more appropriate, and if budget is an issue you could look at
alternatives like Pentaho or Talend.

On Thu, Jun 29, 2017 at 8:48 PM,  wrote:

> Hi,
>
> One more thing - i am talking about spark in cluster mode without hadoop.
>
> Regards,
> Upkar
>
> Sent from my iPhone
>
> On 30-Jun-2017, at 07:55, upkar.ko...@gmail.com wrote:
>
> Hi,
>
> This is my line of thinking - Spark offers a variety of transformations
> which would support most of the use cases for replacing an ETL tool such as
> Informatica. ET part of ETL is perfectly covered. Loading may generally
> require more functionality though. Spinning up Informatica cluster which
> also has a master slave architecture would cost $$. I know pentaho and
> other such tools are there to support the use case. But, can we do the same
> with spark cluster.
>
> Regards,
> Upkar
>
> Sent from my iPhone
>
> On 29-Jun-2017, at 22:06, Gourav Sengupta 
> wrote:
>
> SPARK + JDBC.
>
> But Why?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Jun 29, 2017 at 3:44 PM, upkar_kohli 
> wrote:
>
>> Hi,
>>
>> Has anyone tried mixing Spark with some of the other python jdbc/odbc
>> packages to create an end to end ETL framework. Framwork would enable
>> making update, delete and other DML operations along with Stored proc /
>> function calls across variety of databases. Any setup that would be easy to
>> use.
>>
>> I know only know of few odbc python packages that are production ready
>> and widely used, such as pyodbc or sqlAlchemy
>>
>> JayDeBeApi which can interface with JDBC is in Beta stage
>>
>> Would it be a bad use case if this is attempted with foreachpartition
>> through Spark? If not, what could be a good stack for such an
>> implementation using python.
>>
>> Regards,
>> Upkar
>>
>
>


-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Re: community feedback on RedShift with Spark

2017-04-24 Thread Matt Deaver
Redshift COPY is immensely faster than trying to do insert statements. I
did some rough testing of inserting data using INSERT and COPY and COPY is
vastly superior to the point that if speed is at all an issue to your
process you shouldn't even consider using INSERT.

On Mon, Apr 24, 2017 at 11:07 AM, Afshin, Bardia <
bardia.afs...@capitalone.com> wrote:

> I wanted to reach out to the community to get a understanding of what
> everyones experience is in regardst to maximizing performance as in
> decreasing load time on loading multiple large datasets to RedShift.
>
>
>
> Two approaches:
>
> 1.   Spark writes file to S3, RedShift COPY INTO from S3 bucket.
>
> 2.   Spark directly writes results to RedShfit via JDBC
>
>
>
> JDBC is known for poor performance, and RedShift (wihtout any provided
> examples) claims you can speed up loading from s3 buckets via different
> queues set up in your RedShift Workload Management.
>
>
>
> What’s the communities experience with desiging processes which large
> datasets are needed to be pushed into RedShfit and doing it in minimal time
> taken to load the data to RedShift?
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Re: [Spark-SQL] : Incremental load in Pyspark

2017-04-11 Thread Matt Deaver
It's pretty simple, really: you would run your processing job as much as
you want during the week then when loading into the base table do a window
function based on the primary key(s) and order by the updated time column,
then delete the existing rows with those pks and load that data.

On Tue, Apr 11, 2017 at 2:08 PM, Vamsi Makkena <kv.makk...@gmail.com> wrote:

> Hi Matt,
>
> Thanks for your reply.
>
> I will get updates regularly but I want to load the updated data once in a
> week. Staging table may solve this issue, but I'm looking for how row
> updated time should include in the query.
>
> Thanks
>
> On Tue, Apr 11, 2017 at 2:59 PM Matt Deaver <mattrdea...@gmail.com> wrote:
>
>> Do you have updates coming in on your data flow? If so, you will need a
>> staging table and a merge process into your Teradata tables.
>>
>> If you do not have updated rows aka your Teradata tables are append-only
>> you can process data and insert (bulk load) into Teradata.
>>
>> I don't have experience doing this directly in Spark, though, but
>> according to this post https://community.hortonworks.
>> com/questions/63826/hi-is-there-any-connector-for-
>> teradata-to-sparkwe.html you will need to use a JDBC driver to connect.
>>
>> On Tue, Apr 11, 2017 at 1:23 PM, Vamsi Makkena <kv.makk...@gmail.com>
>> wrote:
>>
>> I am reading the data from Oracle tables and Flat files (new excel file
>> every week) and write it to Teradata weekly using Pyspark.
>>
>> In the initial run it will load the all the data to Teradata. But in the
>> later runs I just want to read the new records from Oracle and Flatfiles
>> and want to append it to teradata tables.
>>
>> How can I do this using Pyspark, without touching the oracle and teradata
>> tables?
>>
>> Please post the sample code if possible.
>>
>> Thanks
>>
>>
>>
>>
>> --
>> Regards,
>>
>> Matt
>> Data Engineer
>> https://www.linkedin.com/in/mdeaver
>> http://mattdeav.pythonanywhere.com/
>>
>


-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Re: [Spark-SQL] : Incremental load in Pyspark

2017-04-11 Thread Matt Deaver
Do you have updates coming in on your data flow? If so, you will need a
staging table and a merge process into your Teradata tables.

If you do not have updated rows aka your Teradata tables are append-only
you can process data and insert (bulk load) into Teradata.

I don't have experience doing this directly in Spark, though, but according
to this post
https://community.hortonworks.com/questions/63826/hi-is-there-any-connector-for-teradata-to-sparkwe.html
you will need to use a JDBC driver to connect.

On Tue, Apr 11, 2017 at 1:23 PM, Vamsi Makkena  wrote:

> I am reading the data from Oracle tables and Flat files (new excel file
> every week) and write it to Teradata weekly using Pyspark.
>
> In the initial run it will load the all the data to Teradata. But in the
> later runs I just want to read the new records from Oracle and Flatfiles
> and want to append it to teradata tables.
>
> How can I do this using Pyspark, without touching the oracle and teradata
> tables?
>
> Please post the sample code if possible.
>
> Thanks
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Best way to deal with skewed partition sizes

2017-03-22 Thread Matt Deaver
For various reasons, our data set is partitioned in Spark by customer id
and saved to S3. When trying to read this data, however, the larger
partitions make it difficult to parallelize jobs. For example, out of a
couple thousand companies, some have <10 MB data while some have >10GB.
This is the code I'm using in a Zeppelin notebook and it takes a very long
time to read in (2+ hours on a ~200 GB dataset from S3):

df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/")
df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/")

# generate a bunch of derived columns here for df1
df1 = df1.withColumn('derivedcol1', df1.source_col)


# limit output columns for later union
df1 = df1.select(
[limited set of columns]
)

# generate a bunch of derived columns here for df2
df2 = df2.withColumn('derivedcol1', df2.source_col)

# limit output columns for later union
df2 = df2.select(
[limited set of columns]
)

print(df1.rdd.getNumPartitions())
print(df2.rdd.getNumPartitions())

merge_df = df1.unionAll(df2)
merge_df.repartition(300)

merge_df.registerTempTable("union_table")
sqlContext.cacheTable("union_table")
sqlContext.sql("select count(*) from union_table").collect()

Any suggestions on making this faster?


Re: Spark streaming to kafka exactly once

2017-03-22 Thread Matt Deaver
You have to handle de-duplication upstream or downstream. It might
technically be possible to handle this in Spark but you'll probably have a
better time handling duplicates in the service that reads from Kafka.

On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
wrote:

> Hi,
> we are trying to build a spark streaming solution that subscribe and push
> to kafka.
>
> But we are running into the problem of duplicates events.
>
> Right now, I am doing a “forEachRdd” and loop over the message of each
> partition and send those message to kafka.
>
>
>
> Is there any good way of solving that issue?
>
>
>
> thanks
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Re: Merging Schema while reading Parquet files

2017-03-21 Thread Matt Deaver
You could create a one-time job that processes historical data to match the
updated format

On Tue, Mar 21, 2017 at 8:53 AM, Aditya Borde  wrote:

> Hello,
>
> I'm currently blocked with this issue:
>
> I have job "A" whose output is partitioned by one of the field - "col1"
> Now job "B" reads the output of job "A".
>
> Here comes the problem. my job "A" output previously not been partitioned
> by "col1" (this is recent change).
> But the thing is now, all my previous data has not been partitioned by
> "col1" for job "A".
> If I want to run my job "B" without any issue with previous as well as
> current data - it is failing as because : "inconsistent partition column
> names"
>
> *Reading Path is something like - "file://path1/name/sample/"* ---> but
> further it has directories *"day=2017-02-15/filling=5/xyz1"*
>
> Currently it is generating one more deeper directory input path --> "
> */day=2017-02-15/filling=5/col1/xyz2"*
>
> "mergeSchema" - is not working here because my base path has multiple
> directories under which files are residing.
>
> Can someone suggest me some effective solution here?
>
> Regards,
> Aditya Borde
>



-- 
Regards,

Matt
Data Engineer
https://www.linkedin.com/in/mdeaver
http://mattdeav.pythonanywhere.com/


Recombining output files in parallel

2017-03-20 Thread Matt Deaver
I have a Spark job that processes incremental data and partitions it by
customer id. Some customers have very little data, and I have another job
that takes a previous period's data and combines it. However, the job runs
serially and I'd basically like to run the function on every partition
simultaneously. Is that possible? Here's the relevant code:

client = boto3.client('s3')
result = client.list_objects(Bucket=source_bucket, Prefix=source_prefix,
Delimiter='/')
for o in result.get('CommonPrefixes'):
folder = o.get('Prefix')
folder_objects = client.list_objects(Bucket=source_bucket,
Prefix=folder)
size_limit = 1024 * 1024 * 1024 # 1 GB
small_files = []
for s3_object in folder_objects.get('Contents'):
s3_size = s3_object.get('Size')
s3_key = s3_object.get('Key')
if s3_size < size_limit:
small_files.append(['s3a://{}/{}'.format(source_bucket,
s3_key), s3_size])
if small_files:
files_list = [x[0] for x in small_files]
files_size = sum([x[1] for x in small_files])
num_partitions = int(files_size / size_limit)
if num_partitions < 1:
num_partitions = 1
agg_df = spark.read.parquet(*files_list)
agg_df = agg_df.repartition(num_partitions)

agg_df.write.mode('overwrite').parquet('s3a://{}/{}/'.format(target_bucket,
folder))