Re: ETL process design
Hi Danny, What you describe sounds like you may also consider to use Spring XD instead, at least for the file-centric stuff. Regards Ben Von meinem iPad gesendet Am 28.01.2015 um 10:42 schrieb Danny Yates da...@codeaholics.org: Hi, My apologies for what has ended up as quite a long email with a lot of open-ended questions, but, as you can see, I'm really struggling to get started and would appreciate some guidance from people with more experience. I'm new to Spark and big data in general, and I'm struggling with what I suspect is actually a fairly simple problem. For background, this process will run on an EMR cluster in AWS. My files are all in S3, but the S3 access is pretty straightforward in that environment, so I'm not overly concerned about that at the moment. I have a process (or rather, a number of processes) which drop JSON events into files in directories in S3 structured by the date the events arrived. I say JSON because they're one JSON message per line, rather than one per file. That is, they are amenable to being loaded with sc.jsonFile(). The directory structure is s3://bucket/path/-mm-dd/many-files-here, where -mm-dd is the received date of the events. Depending on the environment, there could be 4,000 - 5,000 files in each directory, each having up to 3,000 lines (events) in. So plenty of scope for parallelism. In general, there will be something like 2,000,000 events per day initially. The incoming events are of different types (page views, item purchases, etc.) but are currently all bundled into the same set of input files. So the JSON is not uniform across different lines within each file. I'm amenable to changing this if that's helpful and having the events broken out into different files by event type. Oh, and there could be duplicates too, which will need removing. :-) My challenge is to take these files and transfer them into a more long-term storage format suitable for both overnight analytics and also ad-hoc querying. I'm happy for this process to just happen once a day - say, at 1am and process the whole of the previous day's received data. I'm thing that having Parquet files stored in Hive-like partitions would be a sensible way forward: s3://bucket/different-path/t=type/y=/m=mm/d=dd/whatever.parquet. Here, , mm and dd represent the time the event happened, rather than the time it arrived. Does that sound sensible? Do you have any other recommendations? So I need to read each line, parse the JSON, deduplicate the data, decided which event type it is, and output it to the right file in the right directory. I'm struggling with... well... most of it, if I'm honest. Here's what I have so far. val data = sc.textFile(s3:///-mm-dd/*) // load all files for given received date // Deduplicate val dedupe = data.map(line = { val json = new com.fasterxml.jackson.databind.ObjectMapper().readTree(line); val _id = json.get(_id).asText(); // _id is a key that can be used to dedupe val event = json.get(event).asText();// event is the event type val ts = json.get(timestamp).asText();// timestamp is the when the event happened (_id, (event, ts, line)) // I figure having event, ts and line at this point will save time later }).reduceByKey((a, b) = a) // For any given pair of lines with the same _id, pick one arbitrarily At this point, I guess I'm going to have to split this apart by event type (I'm happy to have a priori knowledge of the event types) and formally parse each line using a schema to get a SchemaRDD so I can write out Parquet files. I have exactly zero idea how to approach this part. The other wrinkle here is that Spark seems to want to own the directory it writes to. But it's possible that on any given run we might pick up a few left-over events for a previous day, so we need to be able to handle the situation where we're adding events for a day we've already processed. Many thanks, Danny. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Spark the right tool for me?
To be precise I want the workflow to be associated to a user, but it doesn’t need to be run as part of or depend on a session. I can’t run scheduled jobs, because a user can potentially upload hundreds of files which trigger a long running batch import / update process but he could also make a very small upload / update and immediately wants to continue to work on the (temporary) data that he just uploaded. So that same workflow duration may vary between some seconds, a minute and hours, completely depending on the project's size. So a user can log off and on again to the web site and the initial upload + conversion step may either be still running or finished. He’ll see the progress on the web site, and once the initial processing is done he can continue with the next step of the import workflow, he can interactively change some stuff on that temporary data. After he is done changing stuff, he can hit a „continue“ button which triggers again a long or short running post-processing pipe. Then the user can make a final review of that now post-processed data, and after hitting a „save“ button a final commits pipe pushes / merges the until now temporary data to some persistent store. You’re completely right about that I should simplify as much as possible. Finding the right mix seems key. I’ve also considered to use Kafka to message between Web UI and the pipes, I think it will fit. Chaining the pipes together as a workflow and implementing, managing and monitoring these long running user tasks with locality as I need them is still causing me headache. Btw, the tiling and indexing is not a problem. My propblem is mainly in parallelized conversion, polygon creation, cleaning of CAD file data (e.g. GRASS, prepair, custom tools). After all parts have been preprocessed and gathered in one place, the initial creation of the preview geo file is taking a fraction of the time (inserting all data in one transaction, taking somewhere between sub-second and 10 seconds for very large projects). It’s currently not a concern. (searching for a Kafka+Spark example now) Cheers Ben Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Dienstag, 2. Dezember 2014 10:00 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? The point 4 looks weird to me, I mean if you intent to have such workflow to run in a single session (maybe consider sessionless arch) Is such process for each user? If it's the case, maybe finding a way to do it for all at once would be better (more data but less scheduling). For the micro updates, considering something like a queue (kestrel? or even kafk... whatever, something that works) would be great. So you remove the load off the instances, and the updates can be done at its own pace. Also, you can reuse it to notify the WMS. Isn't there a way to do tiling directly? Also, do you need indexes, I mean do you need the full OGIS power, or just some classical operators are enough (using BBox only for instance)? The more you can simplify the better :-D. These are only my2c, it's hard to think or react appropriately without knowing the whole context. BTW, to answer your very first question: yes, it looks like Spark will help you! cheers, andy On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Yes, the processing causes the most stress. But this is parallizeable by splitting the input source. My problem is that once the heavy preprocessing is done, I’m in a „micro-update“ mode so to say (user-interactive part of the whole workflow). Then the map is rendered directly from the SQLite file by the map server instance on that machine – this is actually a favorable setup for me for resource consumption and implementation costs (I just need to tell the web ui to refresh after something was written to the db, and the map server will render the updates without me changing / coding anything). So my workflow requires to break out of parallel processing for some time. Do you think for my my generalized workflow and tool chain can be like so? 1. Pre-Process many files in a parallel way. Gather all results, deploy them on one single machine. = Spark coalesce() + Crunch (for splitting input files into separate tasks) 2. On the machine where preprocessed results are on, configure a map server to connect to the local SQLite source. Do user-interactive micro-updates on that file (web UI gets updated). 3. Post-process the files in parallel. = Spark + Crunch 4. Design all of the above as a workflow, runnable (or assignable) as part of a user session. = Oozie Do you think this is ok? ~Ben Von: andy petrella andy.petre...@gmail.commailto:andy.petre
Is Spark the right tool for me?
Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be decoupled from the job. So file these updates should be done locally only on the machine that started the job for the whole lifetime of this user interaction. Does Spark allow to create such long running tasks dynamically, so that when another (web) user starts a new task a new long–running task is created and run on the same node, which eventually ends and triggers the next task? Also, is it possible to identify a running task, so that a long running task can be bound to a session (db handler working on local db updates, until task done), and eventually restarted / recreated on failure? ~Ben
Re: Is Spark the right tool for me?
Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have many custom tools and steps, I’ll check our tools fit in. The end result after is actually a 3D map for native OpenGL based rendering on iOS / Android [1]. I’m using GeoPackage which is basically SQLite with R-Tree and a small library around it (more lightweight than SpatialLite). I want to avoid accessing the SQLite db from any other machine or task, that’s where I thought I can use a long running task which is the only process responsible to update a local-only stored SQLite db file. As you also said SQLite (or mostly any other file based db) won’t work well over network. This isn’t only limited to R-Tree but expected limitation because of file locking issues as documented also by SQLite. I also thought to do the same thing when rendering the (web) maps. In combination with the db handler which does the actual changes, I thought to run a map server instance on each node, configure it to add the database location as map source once the task starts. Cheers Ben [1] http://www.deep-map.com Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesahttp://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellishttp://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want
Re: Is Spark the right tool for me?
… Sorry, I forgot to mention why I’m basically bound to SQLite. The workflow involves more data processings than I mentioned. There are several tools in the chain which either rely on SQLite as exchange format, or processings like data cleaning that are done orders of magnitude faster / or using less resources than a heavy weight db for these specialized (and temporary) tasks. Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesahttp://www.geomesa.org/2014/08/05/spark/? Since you need a WMS (or alike), did you considered GeoTrellishttp://geotrellis.io/ (go to the batch processing)? When you say SQLite, you mean that you're using Spatialite? Or your db is not a geo one, and it's simple SQLite. In case you need an r-tree (or related) index, you're headaches will come from congestion within your database transaction... unless you go to a dedicated database like Vertica (just mentioning) kr, andy On Mon Dec 01 2014 at 2:49:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Hi all, I need some advise whether Spark is the right tool for my zoo. My requirements share commonalities with „big data“, workflow coordination and „reactive“ event driven data processing (as in for example Haskell Arrows), which doesn’t make it any easier to decide on a tool set. NB: I have asked a similar question on the Storm mailing list, but have been deferred to Spark. I previously thought Storm was closer to my needs – but maybe neither is. To explain my needs it’s probably best to give an example scenario: * A user uploads small files (typically 1-200 files, file size typically 2-10MB per file) * Files should be converted in parallel and on available nodes. The conversion is actually done via native tools, so there is not so much big data processing required, but dynamic parallelization (so for example to split the conversion step into as many conversion tasks as files are available). The conversion typically takes between several minutes and a few hours. * The converted files gathered and are stored in a single database (containing geometries for rendering) * Once the db is ready, a web map server is (re-)configured and the user can make small updates to the data set via a web UI. * … Some other data processing steps which I leave away for brevity … * There will be initially only a few concurrent users, but the system shall be able to scale if needed My current thoughts: * I should avoid to upload files into the distributed storage during conversion, but probably should rather have each conversion filter download the file it is actually converting from a shared place. Other wise it’s bad for scalability reasons (too many redundant copies of same temporary files if there are many concurrent users and many cluster nodes). * Apache Oozie seems an option to chain together my pipes into a workflow. But is it a good fit with Spark? What options do I have with Spark to chain a workflow from pipes? * Apache Crunch seems to make it easy to dynamically parallelize tasks (Oozie itself can’t do this). But I may not need crunch after all if I have Spark, and it also doesn’t seem to fit to my last problem following. * The part that causes me the most headache is the user interactive db update: I consider to use Kafka as message bus to broker between the web UI and a custom db handler (nb, the db is a SQLite file). But how about update responsiveness, isn’t it that Spark will cause some lags (as opposed to Storm)? * The db handler probably has to be implemented as a long running continuing task, so when a user sends some changes the handler writes these to the db file. However, I want this to be decoupled from the job. So file these updates should be done locally only on the machine that started the job for the whole lifetime of this user interaction. Does Spark allow to create such long running tasks dynamically, so that when another (web) user starts a new task a new long–running task is created and run on the same node, which eventually ends and triggers the next task? Also, is it possible to identify a running task, so that a long running task can be bound to a session (db handler working on local db updates, until task done), and eventually restarted / recreated on failure? ~Ben