Re: ETL process design

2015-01-28 Thread Stadin, Benjamin
Hi Danny,

What you describe sounds like you may also consider to use Spring XD instead, 
at least for the file-centric stuff.


 Am 28.01.2015 um 10:42 schrieb Danny Yates
 My apologies for what has ended up as quite a long email with a lot of 
 open-ended questions, but, as you can see, I'm really struggling to get 
 started and would appreciate some guidance from people with more experience. 
 I'm new to Spark and big data in general, and I'm struggling with what I 
 suspect is actually a fairly simple problem.
 For background, this process will run on an EMR cluster in AWS. My files are 
 all in S3, but the S3 access is pretty straightforward in that environment, 
 so I'm not overly concerned about that at the moment.
 I have a process (or rather, a number of processes) which drop JSON events 
 into files in directories in S3 structured by the date the events arrived. I 
 say JSON because they're one JSON message per line, rather than one per 
 file. That is, they are amenable to being loaded with sc.jsonFile(). The 
 directory structure is s3://bucket/path/-mm-dd/many-files-here, where 
 -mm-dd is the received date of the events.
 Depending on the environment, there could be 4,000 - 5,000 files in each 
 directory, each having up to 3,000 lines (events) in. So plenty of scope for 
 parallelism. In general, there will be something like 2,000,000 events per 
 day initially.
 The incoming events are of different types (page views, item purchases, etc.) 
 but are currently all bundled into the same set of input files. So the JSON 
 is not uniform across different lines within each file. I'm amenable to 
 changing this if that's helpful and having the events broken out into 
 different files by event type.
 Oh, and there could be duplicates too, which will need removing. :-)
 My challenge is to take these files and transfer them into a more long-term 
 storage format suitable for both overnight analytics and also ad-hoc 
 querying. I'm happy for this process to just happen once a day - say, at 1am 
 and process the whole of the previous day's received data.
 I'm thing that having Parquet files stored in Hive-like partitions would be a 
 sensible way forward: 
 s3://bucket/different-path/t=type/y=/m=mm/d=dd/whatever.parquet. Here, 
 , mm and dd represent the time the event happened, rather than the time 
 it arrived. Does that sound sensible? Do you have any other recommendations?
 So I need to read each line, parse the JSON, deduplicate the data, decided 
 which event type it is, and output it to the right file in the right 
 I'm struggling with... well... most of it, if I'm honest. Here's what I have 
 so far.
 val data = sc.textFile(s3:///-mm-dd/*)  // load all files for given 
 received date
 // Deduplicate
 val dedupe = = {
 val json = new 
 val _id = json.get(_id).asText();   // _id is a key that can be used to 
 val event = json.get(event).asText();// event is the event type
 val ts = json.get(timestamp).asText();// timestamp is the when the 
 event happened
 (_id, (event, ts, line))   // I figure having event, ts and line at this 
 point will save time later
 }).reduceByKey((a, b) = a)   // For any given pair of lines with the same 
 _id, pick one arbitrarily
 At this point, I guess I'm going to have to split this apart by event type 
 (I'm happy to have a priori knowledge of the event types) and formally 
 parse each line using a schema to get a SchemaRDD so I can write out Parquet 
 files. I have exactly zero idea how to approach this part.
 The other wrinkle here is that Spark seems to want to own the directory it 
 writes to. But it's possible that on any given run we might pick up a few 
 left-over events for a previous day, so we need to be able to handle the 
 situation where we're adding events for a day we've already processed.
 Many thanks,

Re: Is Spark the right tool for me?

2014-12-02 Thread Stadin, Benjamin
To be precise I want the workflow to be associated to a user, but it doesn’t 
need to be run as part of or depend on a session. I can’t run scheduled jobs, 
because a user can potentially upload hundreds of files which trigger a long 
running batch import / update process but he could also make a very small 
upload / update and immediately wants to continue to work on the (temporary) 
data that he just uploaded. So that same workflow duration may vary between 
some seconds, a minute and hours, completely depending on the project's size.

So a user can log off and on again to the web site and the initial upload + 
conversion step may either be still running or finished. He’ll see the progress 
on the web site, and once the initial processing is done he can continue with 
the next step of the import workflow, he can interactively change some stuff on 
that temporary data. After he is done changing stuff, he can hit a „continue“ 
button which triggers again a long or short running post-processing pipe. Then 
the user can make a final review of that now post-processed data, and after 
hitting a „save“ button a final commits pipe pushes / merges the until now 
temporary data to some persistent store.

You’re completely right about that I should simplify as much as possible. 
Finding the right mix seems key. I’ve also considered to use Kafka to message 
between Web UI and the pipes, I think it will fit. Chaining the pipes together 
as a workflow and implementing, managing and monitoring these long running user 
tasks with locality  as I need them is still causing me headache.

Btw, the tiling and indexing is not a problem. My propblem is mainly in 
parallelized conversion, polygon creation, cleaning of CAD file data (e.g. 
GRASS, prepair, custom tools). After all parts have been preprocessed and 
gathered in one place, the initial creation of the preview geo file is taking a 
fraction of the time (inserting all data in one transaction, taking somewhere 
between sub-second and  10 seconds for very large projects). It’s currently 
not a concern.

(searching for a Kafka+Spark example now)


Von: andy petrella
Datum: Dienstag, 2. Dezember 2014 10:00
An: Benjamin Stadin,
Betreff: Re: Is Spark the right tool for me?

The point 4 looks weird to me, I mean if you intent to have such workflow to 
run in a single session (maybe consider sessionless arch)
Is such process for each user? If it's the case, maybe finding a way to do it 
for all at once would be better (more data but less scheduling).

For the micro updates, considering something like a queue (kestrel? or even 
kafk... whatever, something that works) would be great. So you remove the load 
off the instances, and the updates can be done at its own pace. Also, you can 
reuse it to notify the WMS.
Isn't there a way to do tiling directly? Also, do you need indexes, I mean do 
you need the full OGIS power, or just some classical operators are enough 
(using BBox only for instance)?

The more you can simplify the better :-D.

These are only my2c, it's hard to think or react appropriately without knowing 
the whole context.
BTW, to answer your very first question: yes, it looks like Spark will help you!


On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin
Yes, the processing causes the most stress. But this is parallizeable by 
splitting the input source. My problem is that once the heavy preprocessing is 
done, I’m in a „micro-update“ mode so to say (user-interactive part of the 
whole workflow). Then the map is rendered directly from the SQLite file by the 
map server instance on that machine – this is actually a favorable setup for me 
for resource consumption and implementation costs (I just need to tell the web 
ui to refresh after something was written to the db, and the map server will 
render the updates without me changing / coding anything). So my workflow 
requires to break out of parallel processing for some time.

Do you think for my my generalized workflow and tool chain can be like so?

 1.  Pre-Process many files in a parallel way. Gather all results, deploy them 
on one single machine. = Spark coalesce() + Crunch (for splitting input files 
into separate tasks)
 2.  On the machine where preprocessed results are on, configure a map server 
to connect to the local SQLite source. Do user-interactive micro-updates on 
that file (web UI gets updated).
 3.  Post-process the files in parallel. = Spark + Crunch
 4.  Design all of the above as a workflow, runnable (or assignable) as part of 
a user session. = Oozie

Do you think this is ok?


Von: andy petrella andy.petre...@gmail.com

Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?


Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have 
many custom tools and steps, I’ll check our tools fit in. The end result after 
is actually a 3D map for native OpenGL based rendering on iOS / Android [1].

I’m using GeoPackage which is basically SQLite with R-Tree and a small library 
around it (more lightweight than SpatialLite). I want to avoid accessing the 
SQLite db from any other machine or task, that’s where I thought I can use a 
long running task which is the only process responsible to update a local-only 
stored SQLite db file. As you also said SQLite  (or mostly any other file based 
db) won’t work well over network. This isn’t only limited to R-Tree but 
expected limitation because of file locking issues as documented also by SQLite.

I also thought to do the same thing when rendering the (web) maps. In 
combination with the db handler which does the actual changes, I thought to run 
a map server instance on each node, configure it to add the database location 
as map source once the task starts.



Von: andy petrella
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin,
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
Since you need a WMS (or alike), did you considered 
GeoTrellis (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)


On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want

Re: Is Spark the right tool for me?

2014-12-01 Thread Stadin, Benjamin
… Sorry, I forgot to mention why I’m basically bound to SQLite. The workflow 
involves more data processings than I mentioned. There are several tools in the 
chain which either rely on SQLite as exchange format, or processings like data 
cleaning that are done orders of magnitude faster / or using less resources 
than a heavy weight db for these specialized (and temporary) tasks.

Von: andy petrella
Datum: Montag, 1. Dezember 2014 15:07
An: Benjamin Stadin,
Betreff: Re: Is Spark the right tool for me?

Not quite sure which geo processing you're doing are they raster, vector? More 
info will be appreciated for me to help you further.

Meanwhile I can try to give some hints, for instance, did you considered 
Since you need a WMS (or alike), did you considered 
GeoTrellis (go to the batch processing)?

When you say SQLite, you mean that you're using Spatialite? Or your db is not a 
geo one, and it's simple SQLite. In case you need an r-tree (or related) index, 
you're headaches will come from congestion within your database transaction... 
unless you go to a dedicated database like Vertica (just mentioning)


On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin
Hi all,

I need some advise whether Spark is the right tool for my zoo. My requirements 
share commonalities with „big data“, workflow coordination and „reactive“ event 
driven data processing (as in for example Haskell Arrows), which doesn’t make 
it any easier to decide on a tool set.

NB: I have asked a similar question on the Storm mailing list, but have been 
deferred to Spark. I previously thought Storm was closer to my needs – but 
maybe neither is.

To explain my needs it’s probably best to give an example scenario:

 *   A user uploads small files (typically 1-200 files, file size typically 
2-10MB per file)
 *   Files should be converted in parallel and on available nodes. The 
conversion is actually done via native tools, so there is not so much big data 
processing required, but dynamic parallelization (so for example to split the 
conversion step into as many conversion tasks as files are available). The 
conversion typically takes between several minutes and a few hours.
 *   The converted files gathered and are stored in a single database 
(containing geometries for rendering)
 *   Once the db is ready, a web map server is (re-)configured and the user can 
make small updates to the data set via a web UI.
 *   … Some other data processing steps which I leave away for brevity …
 *   There will be initially only a few concurrent users, but the system shall 
be able to scale if needed

My current thoughts:

 *   I should avoid to upload files into the distributed storage during 
conversion, but probably should rather have each conversion filter download the 
file it is actually converting from a shared place. Other wise it’s bad for 
scalability reasons (too many redundant copies of same temporary files if there 
are many concurrent users and many cluster nodes).
 *   Apache Oozie seems an option to chain together my pipes into a workflow. 
But is it a good fit with Spark? What options do I have with Spark to chain a 
workflow from pipes?
 *   Apache Crunch seems to make it easy to dynamically parallelize tasks 
(Oozie itself can’t do this). But I may not need crunch after all if I have 
Spark, and it also doesn’t seem to fit to my last problem following.
 *   The part that causes me the most headache is the user interactive db 
update: I consider to use Kafka as message bus to broker between the web UI and 
a custom db handler (nb, the db is a SQLite file). But how about update 
responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)?
 *   The db handler probably has to be implemented as a long running continuing 
task, so when a user sends some changes the handler writes these to the db 
file. However, I want this to be decoupled from the job. So file these updates 
should be done locally only on the machine that started the job for the whole 
lifetime of this user interaction. Does Spark allow to create such long running 
tasks dynamically, so that when another (web) user starts a new task a new 
long–running task is created and run on the same node, which eventually ends 
and triggers the next task? Also, is it possible to identify a running task, so 
that a long running task can be bound to a session (db handler working on local 
db updates, until task done), and eventually restarted / recreated on failure?
