+1 for writing the Spark output to Kafka. You can then hang off multiple
compute/storage framework from kafka. I am using a similar pipeline to feed
ElasticSearch and HDFS in parallel. Allows modularity, you can take down
ElasticSearch or HDFS for maintenance without losing (except for some edge
cases) data.

You can even pipeline other Spark streaming apps off kafka to modularize
your processing pipeline so you don't have one single big Spark app doing
all the processing.



On Wed, Feb 18, 2015 at 3:34 PM, Jose Fernandez <jfernan...@sdl.com> wrote:

>  Thanks for the advice folks, it is much appreciated. This seems like a
> pretty unfortunate design flaw. My team was surprised by it.
>
>
>
> I’m going to drop the two-step process and do it all in a single step
> until we get Kafka online.
>
>
>
> *From:* Sean Owen [mailto:so...@cloudera.com]
> *Sent:* Wednesday, February 18, 2015 1:53 AM
> *To:* Emre Sevinc
> *Cc:* Jose Fernandez; user@spark.apache.org
> *Subject:* Re: Spark Streaming output cannot be used as input?
>
>
>
> To clarify, sometimes in the world of Hadoop people freely refer to an
> output 'file' when it's really a directory containing 'part-*' files which
> are pieces of the file. It's imprecise but that's the meaning. I think the
> scaladoc may be referring to 'the path to the file, which includes this
> parent dir, is generated ...' In an inherently distributed system, you want
> to distributed writes and reads, so big "files" are really made of logical
> files within a directory.
>
>
>
> There is a JIRA open to support nested dirs which has been languishing:
> https://issues.apache.org/jira/browse/SPARK-3586
>
> I'm hoping to pursue that again with help from tdas after 1.3.
>
> That's probably the best solution.
>
>
>
> An alternative is to not use the file system as a sort of message queue,
> and instead use something like Kafka. It has a lot of other benefits but
> maybe it's not feasible to add this to your architecture.
>
>
>
> You can merge the files with HDFS APIs without much trouble. The dirs will
> be named consistently according to time and are something you can also
> query for.
>
>
>
> Making 1 partition has implications for parallelism of your job.
>
>
>
> Emre, I think I see what you're getting at but you have the map +
> materialize pattern which i think doesn't have the right guarantees about
> re-execution. Why not foreachRDD?
>
>
>
> Yes you can also consider collecting the whole RDD in foreachRDD and doing
> what you like, including writing to one file. But that would only work if
> the data is always small in each RDD.
>
>
>
>
>   <http://www.sdl.com/innovate/sanfran>
>
>
>   SDL PLC confidential, all rights reserved. If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
>
> On Wed, Feb 18, 2015 at 8:50 AM, Emre Sevinc <emre.sev...@gmail.com>
> wrote:
>
> Hello Jose,
>
> We've hit the same issue a couple of months ago. It is possible to write
> directly to files instead of creating directories, but it is not
> straightforward, and I haven't seen any clear demonstration in books,
> tutorials, etc.
>
> We do something like:
>
> SparkConf sparkConf = new SparkConf().setAppName(appName);
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new
> Duration(batchInterval));
> JavaDStream<String> stream = MyModuleApp.initializeJob(ssc);
> MyModuleApp.process(stream);
>
>
>
> And then in the process method:
>
> @Override public void process(JavaDStream<String> inStream) {
>
>
>
>     JavaDStream<String> json = inStream.map(new 
> MyModuleWorker(jsonSchemaName, validatedJSONoutputDir, 
> rejectedJSONoutputDir));
>
>     forceOutput(json);
>
>   }
>
>  This, in turn, calls the following (I've removed the irrelevant lines to 
> focus on writing):
>
>
> public class MyModuleWorker implements Function<String,String> {
>
>   public String call(String json) {
>
>
>     // process the data and then write it
>
>     writeJSON(json, validatedJSONoutputDir_);
>
>   }
>
>
>
> }
>
> And the writeJSON method is:
>
> public static final void writeJSON(String json, String jsonDirPath) throws 
> IOException {
>
>     String jsonFileName = jsonDirPath + "/" + UUID.randomUUID().toString() + 
> ".json.tmp";
>
>     URI uri = URI.create(jsonFileName);
>
>     Configuration conf = new Configuration();
>
>     FileSystem fileSystem = FileSystem.get(uri, conf);
>
>     FSDataOutputStream out = fileSystem.create(new Path(uri));
>
>     out.write(json.getBytes(StandardCharsets.UTF_8));
>
>     out.close();
>
>
>
>     fileSystem.rename(new Path(uri),
>
>             new Path(URI.create(jsonDirPath + "/" + 
> UUID.randomUUID().toString() + ".json")));
>
>
>
>   }
>
>
>
> Using a similar technique you might be able to achieve your objective.
>
> Kind regards,
>
> Emre Sevinç
> http://www.bigindustries.be/
>
>
>
>
>
> On Wed, Feb 18, 2015 at 4:32 AM, Jose Fernandez <jfernan...@sdl.com>
> wrote:
>
> Hello folks,
>
>
>
> Our intended use case is:
>
> -          Spark Streaming app #1 reads from RabbitMQ and output to HDFS
>
> -          Spark Streaming app #2 reads #1’s output and stores the data
> into Elasticsearch
>
>
>
> The idea behind this architecture is that if Elasticsearch is down due to
> an upgrade or system error we don’t have to stop reading messages from the
> queue. We could also scale each process separately as needed.
>
>
>
> After a few hours research my understanding is that Spark Streaming
> outputs files in a *directory* for which you provide the prefix and suffix.
> This is despite the ScalaDoc for DStream saveAsObjectFiles suggesting
> otherwise:
>
>
>
>   /**
>
>    * Save each RDD in this DStream as a Sequence file of serialized
> objects.
>
>    * The file name at each batch interval is generated based on `prefix`
> and
>
>    * `suffix`: "prefix-TIME_IN_MS.suffix".
>
>    */
>
>
>
> Spark Streaming can monitor an HDFS directory for files but subfolders are
> not supported. So as far as I can tell, it is not possible to use Spark
> Streaming output as input for a different Spark Streaming app without
> somehow performing a separate operation in the middle.
>
>
>
> Am I missing something obvious? I’ve read some suggestions like using
> Hadoop to merge the directories (whose names I don’t see how you would
> know) and to reduce the partitions to 1 (which wouldn’t help).
>
>
>
> Any other suggestions? What is the expected pattern a developer would
> follow that would make Spark Streaming’s output format usable?
>
>
>
>
>
> www.sdl.com
>
> *SDL PLC confidential, all rights reserved.* If you are not the intended
> recipient of this mail SDL requests and requires that you delete it without
> acting upon or copying any of its contents, and we further request that you
> advise us.
>
> SDL PLC is a public limited company registered in England and Wales.
> Registered number: 02675207.
> Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6
> 7DY, UK.
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
> This message has been scanned for malware by Websense. *www.websense.com*
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
>
>
>
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
> --
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
> Emre Sevinc
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
>
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
>
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>
> Click *here* to report this email as spam.
> <http://www.sdl.com/?utm_source=Email&utm_medium=Email%2BSignature&utm_campaign=SDL%2BStandard%2BEmail%2BSignature>
>

Reply via email to