ETL process design

2015-01-28 Thread Danny Yates
Hi,

My apologies for what has ended up as quite a long email with a lot of
open-ended questions, but, as you can see, I'm really struggling to get
started and would appreciate some guidance from people with more
experience. I'm new to Spark and big data in general, and I'm struggling
with what I suspect is actually a fairly simple problem.

For background, this process will run on an EMR cluster in AWS. My files
are all in S3, but the S3 access is pretty straightforward in that
environment, so I'm not overly concerned about that at the moment.

I have a process (or rather, a number of processes) which drop JSON
events into files in directories in S3 structured by the date the events
arrived. I say JSON because they're one JSON message per line, rather
than one per file. That is, they are amenable to being loaded with
sc.jsonFile(). The directory structure is
s3://bucket/path/-mm-dd/many-files-here, where -mm-dd is the
received date of the events.

Depending on the environment, there could be 4,000 - 5,000 files in each
directory, each having up to 3,000 lines (events) in. So plenty of scope
for parallelism. In general, there will be something like 2,000,000 events
per day initially.

The incoming events are of different types (page views, item purchases,
etc.) but are currently all bundled into the same set of input files. So
the JSON is not uniform across different lines within each file. I'm
amenable to changing this if that's helpful and having the events broken
out into different files by event type.

Oh, and there could be duplicates too, which will need removing. :-)

My challenge is to take these files and transfer them into a more long-term
storage format suitable for both overnight analytics and also ad-hoc
querying. I'm happy for this process to just happen once a day - say, at
1am and process the whole of the previous day's received data.

I'm thing that having Parquet files stored in Hive-like partitions would be
a sensible way forward:
s3://bucket/different-path/t=type/y=/m=mm/d=dd/whatever.parquet. Here,
, mm and dd represent the time the event happened, rather than the time
it arrived. Does that sound sensible? Do you have any other recommendations?

So I need to read each line, parse the JSON, deduplicate the data, decided
which event type it is, and output it to the right file in the right
directory.

I'm struggling with... well... most of it, if I'm honest. Here's what I
have so far.

val data = sc.textFile(s3:///-mm-dd/*)  // load all files for
given received date

// Deduplicate
val dedupe = data.map(line = {
val json = new
com.fasterxml.jackson.databind.ObjectMapper().readTree(line);
val _id = json.get(_id).asText();   // _id is a key that can be used
to dedupe
val event = json.get(event).asText();// event is the event type
val ts = json.get(timestamp).asText();// timestamp is the when
the event happened

(_id, (event, ts, line))   // I figure having event, ts and line at
this point will save time later
}).reduceByKey((a, b) = a)   // For any given pair of lines with the same
_id, pick one arbitrarily

At this point, I guess I'm going to have to split this apart by event type
(I'm happy to have a priori knowledge of the event types) and formally
parse each line using a schema to get a SchemaRDD so I can write out
Parquet files. I have exactly zero idea how to approach this part.

The other wrinkle here is that Spark seems to want to own the directory
it writes to. But it's possible that on any given run we might pick up a
few left-over events for a previous day, so we need to be able to handle
the situation where we're adding events for a day we've already processed.

Many thanks,

Danny.


Re: ETL process design

2015-01-28 Thread Stadin, Benjamin
Hi Danny,

What you describe sounds like you may also consider to use Spring XD instead, 
at least for the file-centric stuff.

Regards
Ben

Von meinem iPad gesendet

 Am 28.01.2015 um 10:42 schrieb Danny Yates da...@codeaholics.org:
 
 Hi,
 
 My apologies for what has ended up as quite a long email with a lot of 
 open-ended questions, but, as you can see, I'm really struggling to get 
 started and would appreciate some guidance from people with more experience. 
 I'm new to Spark and big data in general, and I'm struggling with what I 
 suspect is actually a fairly simple problem.
 
 For background, this process will run on an EMR cluster in AWS. My files are 
 all in S3, but the S3 access is pretty straightforward in that environment, 
 so I'm not overly concerned about that at the moment.
 
 I have a process (or rather, a number of processes) which drop JSON events 
 into files in directories in S3 structured by the date the events arrived. I 
 say JSON because they're one JSON message per line, rather than one per 
 file. That is, they are amenable to being loaded with sc.jsonFile(). The 
 directory structure is s3://bucket/path/-mm-dd/many-files-here, where 
 -mm-dd is the received date of the events.
 
 Depending on the environment, there could be 4,000 - 5,000 files in each 
 directory, each having up to 3,000 lines (events) in. So plenty of scope for 
 parallelism. In general, there will be something like 2,000,000 events per 
 day initially.
 
 The incoming events are of different types (page views, item purchases, etc.) 
 but are currently all bundled into the same set of input files. So the JSON 
 is not uniform across different lines within each file. I'm amenable to 
 changing this if that's helpful and having the events broken out into 
 different files by event type.
 
 Oh, and there could be duplicates too, which will need removing. :-)
 
 My challenge is to take these files and transfer them into a more long-term 
 storage format suitable for both overnight analytics and also ad-hoc 
 querying. I'm happy for this process to just happen once a day - say, at 1am 
 and process the whole of the previous day's received data.
 
 I'm thing that having Parquet files stored in Hive-like partitions would be a 
 sensible way forward: 
 s3://bucket/different-path/t=type/y=/m=mm/d=dd/whatever.parquet. Here, 
 , mm and dd represent the time the event happened, rather than the time 
 it arrived. Does that sound sensible? Do you have any other recommendations?
 
 So I need to read each line, parse the JSON, deduplicate the data, decided 
 which event type it is, and output it to the right file in the right 
 directory.
 
 I'm struggling with... well... most of it, if I'm honest. Here's what I have 
 so far.
 
 val data = sc.textFile(s3:///-mm-dd/*)  // load all files for given 
 received date
 
 // Deduplicate
 val dedupe = data.map(line = {
 val json = new 
 com.fasterxml.jackson.databind.ObjectMapper().readTree(line);
 val _id = json.get(_id).asText();   // _id is a key that can be used to 
 dedupe
 val event = json.get(event).asText();// event is the event type
 val ts = json.get(timestamp).asText();// timestamp is the when the 
 event happened
 
 (_id, (event, ts, line))   // I figure having event, ts and line at this 
 point will save time later
 }).reduceByKey((a, b) = a)   // For any given pair of lines with the same 
 _id, pick one arbitrarily
 
 At this point, I guess I'm going to have to split this apart by event type 
 (I'm happy to have a priori knowledge of the event types) and formally 
 parse each line using a schema to get a SchemaRDD so I can write out Parquet 
 files. I have exactly zero idea how to approach this part.
 
 The other wrinkle here is that Spark seems to want to own the directory it 
 writes to. But it's possible that on any given run we might pick up a few 
 left-over events for a previous day, so we need to be able to handle the 
 situation where we're adding events for a day we've already processed.
 
 Many thanks,
 
 Danny.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org