spark.cleaner.ttl for 1.4.1

2015-11-30 Thread Michal Čizmazia
Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running
streaming jobs? Or does *ContextCleaner* alone do all the cleaning?


spark.deploy.zookeeper.url

2015-10-22 Thread Michal Čizmazia
Does the spark.deploy.zookeeper.url configuration work correctly when I
point it to a single virtual IP address with more hosts behind it (load
balancer or round robin)?

https://spark.apache.org/docs/latest/spark-standalone.html#high-availability

ZooKeeper FAQ also discusses this topic:

https://wiki.apache.org/hadoop/ZooKeeper/FAQ#A8

Thanks!


Re: Graceful shutdown drops processing in Spark Streaming

2015-10-07 Thread Michal Čizmazia
Thanks! Done.

https://issues.apache.org/jira/browse/SPARK-10995

On 7 October 2015 at 21:24, Tathagata Das  wrote:

> Aaah, interesting, you are doing 15 minute slide duration. Yeah,
> internally the streaming scheduler waits for the last "batch" interval
> which has data to be processed, but if there is a sliding interval (i.e. 15
> mins) that is higher than batch interval, then that might not be run. This
> is indeed a bug and should be fixed. Mind setting up a JIRA and assigning
> it to me.
>
> On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia  wrote:
>
>> After triggering the graceful shutdown on the following application, the
>> application stops before the windowed stream reaches its slide duration. As
>> a result, the data is not completely processed (i.e. saveToMyStorage is not
>> called) before shutdown.
>>
>> According to the documentation, graceful shutdown should ensure that the
>> data, which has been received, is completely processed before shutdown.
>>
>> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
>>
>> Spark version: 1.4.1
>>
>> Code snippet:
>>
>> Function0 factory = () -> {
>> JavaStreamingContext context = new JavaStreamingContext(sparkConf,
>> Durations.minutes(1));
>> context.checkpoint("/test");
>> JavaDStream records =
>> context.receiverStream(myReliableReceiver).flatMap(...);
>> records.persist(StorageLevel.MEMORY_AND_DISK());
>> records.foreachRDD(rdd -> { rdd.count(); return null; });
>> records
>> .window(Durations.minutes(15), Durations.minutes(15))
>> .foreachRDD(rdd -> saveToMyStorage(rdd));
>> return context;
>> };
>>
>> try (JavaStreamingContext context =
>> JavaStreamingContext.getOrCreate("/test", factory)) {
>> context.start();
>> waitForShutdownSignal();
>> Boolean stopSparkContext = true;
>> Boolean stopGracefully = true;
>> context.stop(stopSparkContext, stopGracefully);
>> }
>>
>>
>


Graceful shutdown drops processing in Spark Streaming

2015-10-07 Thread Michal Čizmazia
After triggering the graceful shutdown on the following application, the
application stops before the windowed stream reaches its slide duration. As
a result, the data is not completely processed (i.e. saveToMyStorage is not
called) before shutdown.

According to the documentation, graceful shutdown should ensure that the
data, which has been received, is completely processed before shutdown.
https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code

Spark version: 1.4.1

Code snippet:

Function0 factory = () -> {
JavaStreamingContext context = new JavaStreamingContext(sparkConf,
Durations.minutes(1));
context.checkpoint("/test");
JavaDStream records =
context.receiverStream(myReliableReceiver).flatMap(...);
records.persist(StorageLevel.MEMORY_AND_DISK());
records.foreachRDD(rdd -> { rdd.count(); return null; });
records
.window(Durations.minutes(15), Durations.minutes(15))
.foreachRDD(rdd -> saveToMyStorage(rdd));
return context;
};

try (JavaStreamingContext context =
JavaStreamingContext.getOrCreate("/test", factory)) {
context.start();
waitForShutdownSignal();
Boolean stopSparkContext = true;
Boolean stopGracefully = true;
context.stop(stopSparkContext, stopGracefully);
}


Re: WAL on S3

2015-09-23 Thread Michal Čizmazia
Thanks Steve!

FYI: S3 now supports GET-after-PUT consistency for new objects in all
regions, including US Standard



https://aws.amazon.com/about-aws/whats-new/2015/08/amazon-s3-introduces-new-usability-enhancements/



<https://aws.amazon.com/about-aws/whats-new/2015/08/amazon-s3-introduces-new-usability-enhancements/>

On 23 September 2015 at 13:12, Steve Loughran 
wrote:

>
> On 23 Sep 2015, at 14:56, Michal Čizmazia  wrote:
>
> To get around the fact that flush does not work in S3, my custom WAL
> implementation stores a separate S3 object per each WriteAheadLog.write
> call.
>
> Do you see any gotchas with this approach?
>
>
>
> nothing obvious.
>
> the blob is PUT in the close() call; once that operation has completed
> then its in S3. For any attempt to open that file to read will immediately
> succeed, now even in US-east if you set the right endpoint:
>
> https://forums.aws.amazon.com/ann.jspa?annID=3112
> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#Regions
>
> If you can avoid listing operations or overwrites, you avoid the fun there.
>
> You do have to bear in mind that the duration of stream.close() is now
> O(bytes) and may fail -a lot of code assumes it is instant and always
> works...
>


Re: WAL on S3

2015-09-23 Thread Michal Čizmazia
To get around the fact that flush does not work in S3, my custom WAL
implementation stores a separate S3 object per each WriteAheadLog.write
call.

Do you see any gotchas with this approach?



On 23 September 2015 at 02:10, Tathagata Das  wrote:

> Responses inline.
>
>
> On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia 
> wrote:
>
>> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?
>>
>> Yes. Because checkpoints are single files by itself, and does not require
> flush semantics to work. So S3 is fine.
>
>
>
>> Trying to answer this question, I looked into
>> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would
>> probably be calling the S3 LIST operation. S3 LIST is prone to eventual
>> consistency [2]. What would happen when getCheckpointFiles retrieves an
>> incomplete list of files to Checkpoint.read [1]?
>>
>> There is a non-zero chance of that happening. But in that case it will
> just use an older checkpoint file to recover the DAG of DStreams. That just
> means that it will recover at an earlier point in time, and undergo more
> computation.
>
>
>
>> The pluggable WAL interface allows me to work around the eventual
>> consistency of S3 by storing an index of filenames in DynamoDB. However it
>> seems that something similar is required for checkpoints as well.
>>
>
> How are you getting around the fact that flush does not work in S3? So
> until the current WAL file is closed, the file is not readable, even if you
> know the index.
> This should not need for checkpoint because of the reason I mentioned
> above in this mail.
>
>
>>
>> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
>> there something I can borrow from DirectKafkaInputDStream? After a DStream
>> computes an RDD, is there a way for the DStream to tell when processing of
>> that RDD has been finished and only after that delete the SQS messages.
>>
>
> You could borrow from Direct Kafka! For that to work, you should be able
> to do the following.
> 1. Each message in SQS should have a unique identifier, using which you
> can specify ranges of messages to read.
>
> 2.  You should be able to query from SQS the identifier of the latest
> message, so that you can decide the range to read -- last read message to
> latest message
>
> 3. There must be a way to find out identifier of the Nth record from the
> current record. This is necessary for rate limiting -- if you want to read
> at most 1000 message in each interval, and you have read till ID X, then
> you should be able to find out (without reading the data), the ID of (X +
> 1000)th record. This is possible in Kafka, as offsets are continuous, but
> not possible in Kinesis as sequence numbers are not continuous numbers.
>
> I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but
> not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5,
> which still uses receivers, but keeps track of Kinesis sequence numbers in
> the metadata WAL.
>
>
>>
>> I was also considering Amazon EFS, but it is only available in a single
>> region for a preview. EBS could be an option, but it cannot be used across
>> multiple Availability Zones.
>>
>> [1]:
>> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
>> [2]:
>> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
>>
>>
>>
>> On 22 September 2015 at 21:09, Tathagata Das  wrote:
>>
>>> You can keep the checkpoints in the Hadoop-compatible file system and
>>> the WAL somewhere else using your custom WAL implementation. Yes, cleaning
>>> up the stuff gets complicated as it is not as easy as deleting off the
>>> checkpoint directory - you will have to clean up checkpoint directory as
>>> well as the whatever other storage that your custom WAL uses. However, if I
>>> remember correctly, the WAL information is used only when the Dstreams are
>>> recovered correctly from checkpoints.
>>>
>>> Note that, there are further details here that require deeper
>>> understanding. There are actually two uses of WALs in the system -
>>>
>>> 1. Data WAL for received data  - This is what is usually referred to as
>>> the WAL everywhere. Each receiver writes to a different WAL. This deals
>>> with bulk data.
>>> 2. Metadata WAL - This is used by the driver to save metadata
>>> information like  block to data WAL segment mapping, etc. I usually skip
>>> mentioning this. This WAL is automatically used when da

Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?

Trying to answer this question, I looked into Checkpoint.getCheckpointFiles
[1]. It is doing findFirstIn which would probably be calling the S3 LIST
operation. S3 LIST is prone to eventual consistency [2]. What would happen
when getCheckpointFiles retrieves an incomplete list of files to
Checkpoint.read [1]?

The pluggable WAL interface allows me to work around the eventual
consistency of S3 by storing an index of filenames in DynamoDB. However it
seems that something similar is required for checkpoints as well.

I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
there something I can borrow from DirectKafkaInputDStream? After a DStream
computes an RDD, is there a way for the DStream to tell when processing of
that RDD has been finished and only after that delete the SQS messages.

I was also considering Amazon EFS, but it is only available in a single
region for a preview. EBS could be an option, but it cannot be used across
multiple Availability Zones.

[1]:
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
[2]:
http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel



On 22 September 2015 at 21:09, Tathagata Das  wrote:

> You can keep the checkpoints in the Hadoop-compatible file system and the
> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
> the stuff gets complicated as it is not as easy as deleting off the
> checkpoint directory - you will have to clean up checkpoint directory as
> well as the whatever other storage that your custom WAL uses. However, if I
> remember correctly, the WAL information is used only when the Dstreams are
> recovered correctly from checkpoints.
>
> Note that, there are further details here that require deeper
> understanding. There are actually two uses of WALs in the system -
>
> 1. Data WAL for received data  - This is what is usually referred to as
> the WAL everywhere. Each receiver writes to a different WAL. This deals
> with bulk data.
> 2. Metadata WAL - This is used by the driver to save metadata information
> like  block to data WAL segment mapping, etc. I usually skip mentioning
> this. This WAL is automatically used when data WAL is enabled. And this
> deals with small data.
>
> If you have to get around S3's limitations, you will have to plugin both
> WALs (see this
> <https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala>
> for SparkConfs, but not that we havent made these confs public). While the
> system supports plugging them in, we havent made this information public
> yet because of such complexities in working with it.  And we have invested
> time in making common sources like Kafka not require WALs (e.g. Direct
> Kafka  approach). In future, we do hope to have a better solution for
> general receivers + WALs + S3 (personally, I really wish S3's semantics
> improve and fixes this issue).
>
> Another alternative direction may be Amazon EFS. Since it based on EBS, it
> may give the necessary semantics. But I havent given that a spin, so its
> uncharted territory :)
>
> TD
>
>
> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia 
> wrote:
>
>> My understanding of pluggable WAL was that it eliminates the need for
>> having a Hadoop-compatible file system [1].
>>
>> What is the use of pluggable WAL when it can be only used together with
>> checkpointing which still requires a Hadoop-compatible file system?
>>
>> [1]: https://issues.apache.org/jira/browse/SPARK-7056
>>
>>
>>
>> On 22 September 2015 at 19:57, Tathagata Das > > wrote:
>>
>>> 1. Currently, the WAL can be used only with checkpointing turned on,
>>> because it does not make sense to recover from WAL if there is not
>>> checkpoint information to recover from.
>>>
>>> 2. Since the current implementation saves the WAL in the checkpoint
>>> directory, they share the fate -- if checkpoint directory is deleted, then
>>> both checkpoint info and WAL info is deleted.
>>>
>>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>>
>>>
>>>
>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
>>> wrote:
>>>
>>>> I am trying to use pluggable WAL, but it can be used only with
>>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>>>> system.
>>>>
>>>> Is there something like pluggable checkpointing?
>>>>
>>>> Or can WAL be used without checkpointing? What happens when

Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
My understanding of pluggable WAL was that it eliminates the need for
having a Hadoop-compatible file system [1].

What is the use of pluggable WAL when it can be only used together with
checkpointing which still requires a Hadoop-compatible file system?

[1]: https://issues.apache.org/jira/browse/SPARK-7056



On 22 September 2015 at 19:57, Tathagata Das 
wrote:

> 1. Currently, the WAL can be used only with checkpointing turned on,
> because it does not make sense to recover from WAL if there is not
> checkpoint information to recover from.
>
> 2. Since the current implementation saves the WAL in the checkpoint
> directory, they share the fate -- if checkpoint directory is deleted, then
> both checkpoint info and WAL info is deleted.
>
> 3. Checkpointing is currently not pluggable. Why do do you want that?
>
>
>
> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
> wrote:
>
>> I am trying to use pluggable WAL, but it can be used only with
>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>> system.
>>
>> Is there something like pluggable checkpointing?
>>
>> Or can WAL be used without checkpointing? What happens when WAL is
>> available but the checkpoint directory is lost?
>>
>> Thanks!
>>
>>
>> On 18 September 2015 at 05:47, Tathagata Das  wrote:
>>
>>> I dont think it would work with multipart upload either. The file is not
>>> visible until the multipart download is explicitly closed. So even if each
>>> write a part upload, all the parts are not visible until the multiple
>>> download is closed.
>>>
>>> TD
>>>
>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
>>> wrote:
>>>
>>>>
>>>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>>>> >
>>>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>>>> work with S3 because S3 does not support flushing. Basically, the current
>>>> implementation assumes that after write + flush, the data is immediately
>>>> durable, and readable if the system crashes without closing the WAL file.
>>>> This does not work with S3 as data is durable only and only if the S3 file
>>>> output stream is cleanly closed.
>>>> >
>>>>
>>>>
>>>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>>>> clients Spark uses *doesn't even upload anything to s3*.
>>>>
>>>> It's not a filesystem, and you have to bear that in mind.
>>>>
>>>> Amazon's own s3 client used in EMR behaves differently; it may be
>>>> usable as a destination (I haven't tested)
>>>>
>>>>
>>>
>>
>


Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
I am trying to use pluggable WAL, but it can be used only with
checkpointing turned on. Thus I still need have a Hadoop-compatible file
system.

Is there something like pluggable checkpointing?

Or can WAL be used without checkpointing? What happens when WAL is
available but the checkpoint directory is lost?

Thanks!


On 18 September 2015 at 05:47, Tathagata Das  wrote:

> I dont think it would work with multipart upload either. The file is not
> visible until the multipart download is explicitly closed. So even if each
> write a part upload, all the parts are not visible until the multiple
> download is closed.
>
> TD
>
> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
> wrote:
>
>>
>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>> >
>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>> work with S3 because S3 does not support flushing. Basically, the current
>> implementation assumes that after write + flush, the data is immediately
>> durable, and readable if the system crashes without closing the WAL file.
>> This does not work with S3 as data is durable only and only if the S3 file
>> output stream is cleanly closed.
>> >
>>
>>
>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>> clients Spark uses *doesn't even upload anything to s3*.
>>
>> It's not a filesystem, and you have to bear that in mind.
>>
>> Amazon's own s3 client used in EMR behaves differently; it may be usable
>> as a destination (I haven't tested)
>>
>>
>


Re: KafkaDirectStream can't be recovered from checkpoint

2015-09-18 Thread Michal Čizmazia
Hi Petr, after Ctrl+C can you see the following message in the logs?

Invoking stop(stopGracefully=false)

Details:
https://github.com/apache/spark/pull/6307


On 18 September 2015 at 10:28, Petr Novak  wrote:

> It might be connected with my problems with gracefulShutdown in Spark
> 1.5.0 2.11
> https://mail.google.com/mail/#search/petr/14fb6bd5166f9395
>
> Maybe Ctrl+C corrupts checkpoints and breaks gracefulShutdown?
>
> Petr
>
> On Fri, Sep 18, 2015 at 4:10 PM, Petr Novak  wrote:
>
>> ...to ensure it is not something wrong on my cluster.
>>
>> On Fri, Sep 18, 2015 at 4:09 PM, Petr Novak  wrote:
>>
>>> I have tried it on Spark 1.3.0 2.10 and it works. The same code doesn't
>>> on Spark 1.5.0 2.11. It would be nice if anybody could try on another
>>> installation to ensure it is something wrong on my cluster.
>>>
>>> Many thanks,
>>> Petr
>>>
>>> On Fri, Sep 18, 2015 at 4:07 PM, Petr Novak 
>>> wrote:
>>>
 This one is generated, I suppose, after Ctrl+C

 15/09/18 14:38:25 INFO Worker: Asked to kill executor
 app-20150918143823-0001/0
 15/09/18 14:38:25 INFO Worker: Asked to kill executor
 app-20150918143823-0001/0
 15/09/18 14:38:25 DEBUG
 AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
 message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
 Actor[akka://sparkWorker/deadLetters]
 15/09/18 14:38:25 DEBUG
 AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled
 message (0.568753 ms) AkkaMessage(KillExecutor(#,false) from
 Actor[akka://sparkWorker/deadLetters]
 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
 app-20150918143823-0001/0 interrupted
 15/09/18 14:38:25 INFO ExecutorRunner: Runner thread for executor
 app-20150918143823-0001/0 interrupted
 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
 15/09/18 14:38:25 INFO ExecutorRunner: Killing process!
 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 java.io.IOException: Stream closed
 at
 java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 at java.io.FilterInputStream.read(FilterInputStream.java:107)
 at
 org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
 15/09/18 14:38:25 ERROR FileAppender: Error writing stream to file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 java.io.IOException: Stream closed
 at
 java.io.BufferedInputStream.getBufIfOpen(BufferedInputStream.java:162)
 at java.io.BufferedInputStream.read1(BufferedInputStream.java:272)
 at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
 at java.io.FilterInputStream.read(FilterInputStream.java:107)
 at
 org.apache.spark.util.logging.FileAppender.appendStreamToFile(FileAppender.scala:70)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply$mcV$sp(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1$$anonfun$run$1.apply(FileAppender.scala:39)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
 at
 org.apache.spark.util.logging.FileAppender$$anon$1.run(FileAppender.scala:38)
 15/09/18 14:38:25 DEBUG FileAppender: Closed file
 /dfs/spark/work/app-20150918143823-0001/0/stderr
 15/09/18 14:38:25 DEBUG FileAppender: Closed file
 /dfs/spark/work/app-20150918143823-0001/0/stderr

 Petr

>>>
>>>
>>
>


Re: Checkpointing with Kinesis

2015-09-18 Thread Michal Čizmazia
FYI re WAL on S3

http://search-hadoop.com/m/q3RTtFMpd41A7TnH/WAL+S3&subj=WAL+on+S3



On 18 September 2015 at 13:32, Alan Dipert  wrote:

> Hello,
>
> Thanks all for considering our problem.  We are doing transformations in
> Spark Streaming.  We have also since learned that WAL to S3 on 1.4 is "not
> reliable" [1]
>
> We are just going to wait for EMR to support 1.5 and hopefully this won't
> be a problem anymore [2].
>
> Alan
>
> 1.
> https://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCA+AHuKkH9r0BwQMgQjDG+j=qdcqzpow1rw1u4d0nrcgmq5x...@mail.gmail.com%3E
> 2. https://issues.apache.org/jira/browse/SPARK-9215
>
> On Fri, Sep 18, 2015 at 4:23 AM, Nick Pentreath 
> wrote:
>
>> Are you doing actual transformations / aggregation in Spark Streaming? Or
>> just using it to bulk write to S3?
>>
>> If the latter, then you could just use your AWS Lambda function to read
>> directly from the Kinesis stream. If the former, then perhaps either look
>> into the WAL option that Aniket mentioned, or perhaps you could write the
>> processed RDD back to Kinesis, and have the Lambda function read the
>> Kinesis stream and write to Redshift?
>>
>> On Thu, Sep 17, 2015 at 5:48 PM, Alan Dipert  wrote:
>>
>>> Hello,
>>> We are using Spark Streaming 1.4.1 in AWS EMR to process records from
>>> Kinesis.  Our Spark program saves RDDs to S3, after which the records are
>>> picked up by a Lambda function that loads them into Redshift.  That no data
>>> is lost during processing is important to us.
>>>
>>> We have set our Kinesis checkpoint interval to 15 minutes, which is also
>>> our window size.
>>>
>>> Unfortunately, checkpointing happens after receiving data from Kinesis,
>>> not after we have successfully written to S3.  If batches back up in Spark,
>>> and the cluster is terminated, whatever data was in-memory will be lost
>>> because it was checkpointed but not actually saved to S3.
>>>
>>> We are considering forking and modifying the kinesis-asl library with
>>> changes that would allow us to perform the checkpoint manually and at the
>>> right time.  We'd rather not do this.
>>>
>>> Are we overlooking an easier way to deal with this problem?  Thank you
>>> in advance for your insight!
>>>
>>> Alan
>>>
>>
>>
>


Re: WAL on S3

2015-09-17 Thread Michal Čizmazia
Please could you explain how to use pluggable WAL?

After I implement the WriteAheadLog abstract class, how can I use it? I
want to use it with a Custom Reliable Receiver. I am using Spark 1.4.1.

Thanks!


On 17 September 2015 at 16:40, Tathagata Das  wrote:

> Actually, the current WAL implementation (as of Spark 1.5) does not work
> with S3 because S3 does not support flushing. Basically, the current
> implementation assumes that after write + flush, the data is immediately
> durable, and readable if the system crashes without closing the WAL file.
> This does not work with S3 as data is durable only and only if the S3 file
> output stream is cleanly closed.
>
>
>
>
> On Thu, Sep 17, 2015 at 1:30 PM, Ted Yu  wrote:
>
>> I assume you don't use Kinesis.
>>
>> Are you running Spark 1.5.0 ?
>> If you must use S3, is switching to Kinesis possible ?
>>
>> Cheers
>>
>> On Thu, Sep 17, 2015 at 1:09 PM, Michal Čizmazia 
>> wrote:
>>
>>> How to make Write Ahead Logs to work with S3? Any pointers welcome!
>>>
>>> It seems as a known issue:
>>> https://issues.apache.org/jira/browse/SPARK-9215
>>>
>>> I am getting this exception when reading write ahead log:
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>> due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent
>>> failure: Lost task 0.0 in stage 5.0 (TID 14, localhost):
>>> org.apache.spark.SparkException: Could not read data from write ahead log
>>> record
>>> FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>> at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
>>> at
>>> org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
>>> $apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
>>> ... 13 more
>>>
>>>
>>>
>>
>


WAL on S3

2015-09-17 Thread Michal Čizmazia
How to make Write Ahead Logs to work with S3? Any pointers welcome!

It seems as a known issue: https://issues.apache.org/jira/browse/SPARK-9215

I am getting this exception when reading write ahead log:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure:
Lost task 0.0 in stage 5.0 (TID 14, localhost):
org.apache.spark.SparkException: Could not read data from write ahead log
record
FileBasedWriteAheadLogSegment(s3ax://test/test/0/log-1442512871968-1442512931968,0,1721)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:144)
at
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.compute(WriteAheadLogBackedBlockRDD.scala:170)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.spark.streaming.util.FileBasedWriteAheadLog.read(FileBasedWriteAheadLog.scala:106)
at org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD.org
$apache$spark$streaming$rdd$WriteAheadLogBackedBlockRDD$$getBlockFromWriteAheadLog$1(WriteAheadLogBackedBlockRDD.scala:141)
... 13 more


Managing scheduling delay in Spark Streaming

2015-09-15 Thread Michal Čizmazia
Hi,

I have a Reliable Custom Receiver storing messages into Spark. Is there way
how to prevent my receiver from storing more messages into Spark when the
Scheduling Delay reaches a certain threshold?

Possible approaches:
#1 Does Spark block on the Receiver.store(messages) call to prevent storing
more messages and overflowing the system?
#2 How to obtain the Scheduling Delay in the Custom Receiver, so that I can
implement the feature.

Thanks,

Mike


Re: Graceful shutdown for Spark Streaming

2015-08-10 Thread Michal Čizmazia
>From logs, it seems that Spark Streaming does handle *kill -SIGINT* with
graceful shutdown.

Please could you confirm?

Thanks!

On 30 July 2015 at 08:19, anshu shukla  wrote:

> Yes I  was  doing same  , if  You mean that this is the correct way to do
>  Then I will verify it  once more in my case .
>
> On Thu, Jul 30, 2015 at 1:02 PM, Tathagata Das 
> wrote:
>
>> How is sleep not working? Are you doing
>>
>> streamingContext.start()
>> Thread.sleep(xxx)
>> streamingContext.stop()
>>
>> On Wed, Jul 29, 2015 at 6:55 PM, anshu shukla 
>> wrote:
>>
>>> If we want to stop the  application after fix-time period , how it will
>>> work . (How to give the duration in logic , in my case  sleep(t.s.)  is not
>>> working .)  So i used to kill coarseGrained job at each slave by script
>>> .Please suggest something .
>>>
>>> On Thu, Jul 30, 2015 at 5:14 AM, Tathagata Das 
>>> wrote:
>>>
>>>> StreamingContext.stop(stopGracefully = true) stops the streaming
>>>> context gracefully.
>>>> Then you can safely terminate the Spark cluster. They are two different
>>>> steps and needs to be done separately ensuring that the driver process has
>>>> been completely terminated before the Spark cluster is the terminated.
>>>>
>>>> On Wed, Jul 29, 2015 at 6:43 AM, Michal Čizmazia 
>>>> wrote:
>>>>
>>>>> How to initiate graceful shutdown from outside of the Spark Streaming
>>>>> driver process? Both for the local and cluster mode of Spark Standalone as
>>>>> well as EMR.
>>>>>
>>>>> Does sbin/stop-all.sh stop the context gracefully? How is it done? Is
>>>>> there a signal sent to the driver process?
>>>>>
>>>>> For EMR, is there a way how to terminate an EMR cluster with Spark
>>>>> Streaming graceful shutdown?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anshu Shukla
>>>
>>
>>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: How do I Process Streams that span multiple lines?

2015-08-03 Thread Michal Čizmazia
Sorry.

SparkContext.wholeTextFiles

Not sure about streams.

On 3 August 2015 at 14:50, Michal Čizmazia  wrote:

> Are you looking for RDD.wholeTextFiles?
>
> On 3 August 2015 at 10:57, Spark Enthusiast 
> wrote:
>
>> All  examples of Spark Stream programming that I see assume streams of
>> lines that are then tokenised and acted upon (like the WordCount example).
>>
>> How do I process Streams that span multiple lines? Are there examples
>> that I can use?
>>
>
>


Re: How do I Process Streams that span multiple lines?

2015-08-03 Thread Michal Čizmazia
Are you looking for RDD.wholeTextFiles?

On 3 August 2015 at 10:57, Spark Enthusiast 
wrote:

> All  examples of Spark Stream programming that I see assume streams of
> lines that are then tokenised and acted upon (like the WordCount example).
>
> How do I process Streams that span multiple lines? Are there examples that
> I can use?
>


Graceful shutdown for Spark Streaming

2015-07-29 Thread Michal Čizmazia
How to initiate graceful shutdown from outside of the Spark Streaming
driver process? Both for the local and cluster mode of Spark Standalone as
well as EMR.

Does sbin/stop-all.sh stop the context gracefully? How is it done? Is there
a signal sent to the driver process?

For EMR, is there a way how to terminate an EMR cluster with Spark
Streaming graceful shutdown?

Thanks!


Re: Parallelism of Custom receiver in spark

2015-07-26 Thread Michal Čizmazia
#1 see
https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

#2 By default, all input data and persisted RDDs generated by DStream
transformations are automatically cleared. Spark Streaming decides when to
clear the data based on the transformations that are used. See
https://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning

Hope this helps.




On 25 July 2015 at 13:43, anshu shukla  wrote:

> 1 - How to increase the level of *parallelism in spark streaming  custom
>  RECEIVER* .
>
> 2 -  Will  ssc.receiverstream(/**anything //)   will *delete the data
> stored in spark memory using store(s) * logic .
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Thanks Matei! It worked.

On 9 July 2015 at 19:43, Matei Zaharia  wrote:

> Thus means that one of your cached RDD partitions is bigger than 2 GB of
> data. You can fix it by having more partitions. If you read data from a
> file system like HDFS or S3, set the number of partitions higher in the
> sc.textFile, hadoopFile, etc methods (it's an optional second parameter to
> those methods). If you create it through parallelize or if this particular
> RDD comes from a shuffle, use more tasks in the parallelize or shuffle.
>
> Matei
>
> On Jul 9, 2015, at 3:35 PM, Michal Čizmazia  wrote:
>
> Spark version 1.4.0 in the Standalone mode
>
> 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
> BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
> GB)
> 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
> Exception in task 0.0 in stage 0.0 (TID 0)
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
> at
> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
> at
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
> at
> org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)
> at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:615)
> at
> org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> On 9 July 2015 at 18:11, Ted Yu  wrote:
>
>> Which release of Spark are you using ?
>>
>> Can you show the complete stack trace ?
>>
>> getBytes() could be called from:
>> getBytes(file, 0, file.length)
>> or:
>> getBytes(segment.file, segment.offset, segment.length)
>>
>> Cheers
>>
>> On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia 
>> wrote:
>>
>>> Please could anyone give me pointers for appropriate SparkConf to work
>>> around "Size exceeds Integer.MAX_VALUE"?
>>>
>>> Stacktrace:
>>>
>>> 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
>>> BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
>>> GB)
>>> 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
>>> Exception in task 0.0 in stage 0.0 (TID 0)
>>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>>> at
>>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>>> ...
>>>
>>>
>>
>
>


Re: work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Spark version 1.4.0 in the Standalone mode

2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
GB)
2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:509)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:427)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:615)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:154)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


On 9 July 2015 at 18:11, Ted Yu  wrote:

> Which release of Spark are you using ?
>
> Can you show the complete stack trace ?
>
> getBytes() could be called from:
> getBytes(file, 0, file.length)
> or:
> getBytes(segment.file, segment.offset, segment.length)
>
> Cheers
>
> On Thu, Jul 9, 2015 at 2:50 PM, Michal Čizmazia  wrote:
>
>> Please could anyone give me pointers for appropriate SparkConf to work
>> around "Size exceeds Integer.MAX_VALUE"?
>>
>> Stacktrace:
>>
>> 2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
>> BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
>> GB)
>> 2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
>> Exception in task 0.0 in stage 0.0 (TID 0)
>> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
>> at
>> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
>> ...
>>
>>
>


work around Size exceeds Integer.MAX_VALUE

2015-07-09 Thread Michal Čizmazia
Please could anyone give me pointers for appropriate SparkConf to work
around "Size exceeds Integer.MAX_VALUE"?

Stacktrace:

2015-07-09 20:12:02 INFO  (sparkDriver-akka.actor.default-dispatcher-3)
BlockManagerInfo:59 - Added rdd_0_0 on disk on localhost:51132 (size: 29.8
GB)
2015-07-09 20:12:02 ERROR (Executor task launch worker-0) Executor:96 -
Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836)
at
org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125)
...


Re: change default storage level

2015-07-09 Thread Michal Čizmazia
Thanks Shixiong! Your response helped me to understand the role of
persist(). No persist() calls were required indeed. I solved my problem by
setting spark.local.dir to allow more space for Spark temporary folder. It
works automatically. I am seeing logs like this:

Not enough space to cache rdd_0_1 in memory!
Persisting partition rdd_0_1 to disk instead.

Before I was getting:

No space left on device


On 9 July 2015 at 11:57, Shixiong Zhu  wrote:

> Spark won't store RDDs to memory unless you use a memory StorageLevel. By
> default, your input and intermediate results won't be put into memory. You
> can call persist if you want to avoid duplicate computation or reading.
> E.g.,
>
> val r1 = context.wholeTextFiles(...)
> val r2 = r1.flatMap(s -> ...)
> val r3 = r2.filter(...)...
> r3.saveAsTextFile(...)
> val r4 = r2.map(...)...
> r4.saveAsTextFile(...)
>
> In the avoid example, r2 will be used twice. To speed up the computation,
> you can call r2.persist(StorageLevel.MEMORY) to store r2 into memory. Then
> r4 will use the data of r2 in memory directly. E.g.,
>
> val r1 = context.wholeTextFiles(...)
> val r2 = r1.flatMap(s -> ...)
> r2.persist(StorageLevel.MEMORY)
> val r3 = r2.filter(...)...
> r3.saveAsTextFile(...)
> val r4 = r2.map(...)...
> r4.saveAsTextFile(...)
>
> See
> http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
>
>
> Best Regards,
> Shixiong Zhu
>
> 2015-07-09 22:09 GMT+08:00 Michal Čizmazia :
>
>> Is there a way how to change the default storage level?
>>
>> If not, how can I properly change the storage level wherever necessary,
>> if my input and intermediate results do not fit into memory?
>>
>> In this example:
>>
>> context.wholeTextFiles(...)
>> .flatMap(s -> ...)
>> .flatMap(s -> ...)
>>
>> Does persist() need to be called after every transformation?
>>
>>  context.wholeTextFiles(...)
>> .persist(StorageLevel.MEMORY_AND_DISK)
>> .flatMap(s -> ...)
>> .persist(StorageLevel.MEMORY_AND_DISK)
>> .flatMap(s -> ...)
>> .persist(StorageLevel.MEMORY_AND_DISK)
>>
>>  Thanks!
>>
>>
>


change default storage level

2015-07-09 Thread Michal Čizmazia
Is there a way how to change the default storage level?

If not, how can I properly change the storage level wherever necessary, if
my input and intermediate results do not fit into memory?

In this example:

context.wholeTextFiles(...)
.flatMap(s -> ...)
.flatMap(s -> ...)

Does persist() need to be called after every transformation?

 context.wholeTextFiles(...)
.persist(StorageLevel.MEMORY_AND_DISK)
.flatMap(s -> ...)
.persist(StorageLevel.MEMORY_AND_DISK)
.flatMap(s -> ...)
.persist(StorageLevel.MEMORY_AND_DISK)

 Thanks!


Re: Are Spark Streaming RDDs always processed in order?

2015-07-04 Thread Michal Čizmazia
I had a similar inquiry, copied below.

I was also looking into making an SQS Receiver reliable:
http://stackoverflow.com/questions/30809975/reliable-sqs-receiver-for-spark-streaming

Hope this helps.

-- Forwarded message --
From: Tathagata Das 
Date: 20 June 2015 at 17:21
Subject: Re: Serial batching with Spark Streaming
To: Michal Čizmazia 
Cc: Binh Nguyen Van , user 


No it does not. By default, only after all the retries etc related to batch
X is done, then batch X+1 will be started.

Yes, one RDD per batch per DStream. However, the RDD could be a union of
multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned
DStream).

TD

On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia  wrote:
Thanks Tathagata!

I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then.

Does the default scheduler initiate the execution of the *batch X+1* after
the *batch X* even if tasks for the* batch X *need to be *retried due to
failures*? If not, please could you suggest workarounds and point me to the
code?

One more thing was not 100% clear to me from the documentation: Is there
exactly *1 RDD* published *per a batch interval* in a DStream?


On 3 July 2015 at 22:12, khaledh  wrote:

> I'm writing a Spark Streaming application that uses RabbitMQ to consume
> events. One feature of RabbitMQ that I intend to make use of is bulk ack of
> messages, i.e. no need to ack one-by-one, but only ack the last event in a
> batch and that would ack the entire batch.
>
> Before I commit to doing so, I'd like to know if Spark Streaming always
> processes RDDs in the same order they arrive in, i.e. if RDD1 arrives
> before
> RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is
> finished?
>
> This is crucial to the ack logic, since if RDD2 can be potentially
> processed
> while RDD1 is still being processed, then if I ack the the last event in
> RDD2 that would also ack all events in RDD1, even though they may have not
> been completely processed yet.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Feature Generation On Spark

2015-07-04 Thread Michal Čizmazia
Spark Context has a method wholeTextFiles. Is that what you need?

On 4 July 2015 at 07:04, rishikesh  wrote:
> Hi
>
> I am new to Spark and am working on document classification. Before model
> fitting I need to do feature generation. Each document is to be converted to
> a feature vector. However I am not sure how to do that. While testing
> locally I have a static list of tokens and when I parse a file I do a lookup
> and increment counters.
>
> In the case of Spark I can create an RDD which loads all the documents
> however I am not sure if one files goes to one executor or multiple. If the
> file is split then the feature vectors needs to be merged. But I am not able
> to figure out how to do that.
>
> Thanks
> Rishi
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Feature-Generation-On-Spark-tp23617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dependency Injection with Spark Java

2015-06-29 Thread Michal Čizmazia
Currently, I am considering to use Guava Suppliers for delayed
initialization in workers

Supplier supplier = (Serializable & Supplier) () -> new T();
Supplier singleton = Suppliers.memoize(supplier);



On 26 June 2015 at 13:17, Igor Berman  wrote:

> asked myself same question today...actually depends on what you are trying
> to do
> if you want injection into workers code I think it will be a bit hard...
> if only in code that driver executes i.e. in main, it's straight forward
> imho, just create your classes from injector(e.g. spring's application
> context)
>
> On 26 June 2015 at 15:49, Michal Čizmazia  wrote:
>
>> How to use Dependency Injection with Spark Java? Please could you point
>> me to any articles/frameworks?
>>
>> Thanks!
>>
>
>


Dependency Injection with Spark Java

2015-06-26 Thread Michal Čizmazia
How to use Dependency Injection with Spark Java? Please could you point me
to any articles/frameworks?

Thanks!


Re: Using Accumulators in Streaming

2015-06-22 Thread Michal Čizmazia
I stumbled upon zipWithUniqueId/zipWithIndex. Is this what you are looking
for?

https://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDDLike.html#zipWithUniqueId()


On 22 June 2015 at 06:16, Michal Čizmazia  wrote:

> If I am not mistaken, one way to see the accumulators is that they are
> just write-only for the workers and their value can be read by the driver.
> Therefore they cannot be used for ID generation as you wish.
>
> On 22 June 2015 at 04:30, anshu shukla  wrote:
>
>> But i just want to update rdd , by  appending  unique message ID  with
>> each  element of RDD , which should be automatically(m++ ..) updated every
>> time  a new element comes to rdd .
>>
>> On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia 
>> wrote:
>>
>>> StreamingContext.sparkContext()
>>>
>>> On 21 June 2015 at 21:32, Will Briggs  wrote:
>>>
>>>> It sounds like accumulators are not necessary in Spark Streaming - see
>>>> this post (
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html)
>>>> for more details.
>>>>
>>>>
>>>> On June 21, 2015, at 7:31 PM, anshu shukla 
>>>> wrote:
>>>>
>>>>
>>>> In spark Streaming ,Since we are already having Streaming context ,
>>>>  which does not allows us to have accumulators .We have to get sparkContext
>>>>  for initializing accumulator value .
>>>> But  having 2 spark context will not serve the problem .
>>>>
>>>> Please Help !!
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anshu Shukla
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>


Re: Using Accumulators in Streaming

2015-06-22 Thread Michal Čizmazia
If I am not mistaken, one way to see the accumulators is that they are just
write-only for the workers and their value can be read by the driver.
Therefore they cannot be used for ID generation as you wish.

On 22 June 2015 at 04:30, anshu shukla  wrote:

> But i just want to update rdd , by  appending  unique message ID  with
> each  element of RDD , which should be automatically(m++ ..) updated every
> time  a new element comes to rdd .
>
> On Mon, Jun 22, 2015 at 7:05 AM, Michal Čizmazia 
> wrote:
>
>> StreamingContext.sparkContext()
>>
>> On 21 June 2015 at 21:32, Will Briggs  wrote:
>>
>>> It sounds like accumulators are not necessary in Spark Streaming - see
>>> this post (
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html)
>>> for more details.
>>>
>>>
>>> On June 21, 2015, at 7:31 PM, anshu shukla 
>>> wrote:
>>>
>>>
>>> In spark Streaming ,Since we are already having Streaming context ,
>>>  which does not allows us to have accumulators .We have to get sparkContext
>>>  for initializing accumulator value .
>>> But  having 2 spark context will not serve the problem .
>>>
>>> Please Help !!
>>>
>>> --
>>> Thanks & Regards,
>>> Anshu Shukla
>>>
>>
>>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: Using Accumulators in Streaming

2015-06-21 Thread Michal Čizmazia
StreamingContext.sparkContext()

On 21 June 2015 at 21:32, Will Briggs  wrote:

> It sounds like accumulators are not necessary in Spark Streaming - see
> this post (
> http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html)
> for more details.
>
>
> On June 21, 2015, at 7:31 PM, anshu shukla  wrote:
>
>
> In spark Streaming ,Since we are already having Streaming context ,  which
> does not allows us to have accumulators .We have to get sparkContext  for
> initializing accumulator value .
> But  having 2 spark context will not serve the problem .
>
> Please Help !!
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: Serial batching with Spark Streaming

2015-06-20 Thread Michal Čizmazia
Thank you very much for confirmation.

On 20 June 2015 at 17:21, Tathagata Das  wrote:

> No it does not. By default, only after all the retries etc related to
> batch X is done, then batch X+1 will be started.
>
> Yes, one RDD per batch per DStream. However, the RDD could be a union of
> multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned
> DStream).
>
> TD
>
> On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia 
> wrote:
>
>> Thanks Tathagata!
>>
>> I will use *foreachRDD*/*foreachPartition*() instead of *trasform*()
>> then.
>>
>> Does the default scheduler initiate the execution of the *batch X+1*
>> after the *batch X* even if tasks for the* batch X *need to be *retried
>> due to failures*? If not, please could you suggest workarounds and point
>> me to the code?
>>
>> One more thing was not 100% clear to me from the documentation: Is there
>> exactly *1 RDD* published *per a batch interval* in a DStream?
>>
>>
>>
>> On 19 June 2015 at 16:58, Tathagata Das  wrote:
>>
>>> I see what is the problem. You are adding sleep in the transform
>>> operation. The transform function is called at the time of preparing the
>>> Spark jobs for a batch. It should not be running any time consuming
>>> operation like a RDD action or a sleep. Since this operation needs to run
>>> every batch interval, doing blocking long running operation messes with the
>>> need to run every batch interval.
>>>
>>> I will try to make this clearer in the guide. I had not seen anyone do
>>> something like this before and therefore it did not occur to me that this
>>> could happen. As long as you dont do time consuming blocking operation in
>>> the transform function, the batches will be generated, scheduled and
>>> executed in serial order by default.
>>>
>>> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia 
>>> wrote:
>>>
>>>> Binh, thank you very much for your comment and code. Please could you
>>>> outline an example use of your stream? I am a newbie to Spark. Thanks 
>>>> again!
>>>>
>>>> On 18 June 2015 at 14:29, Binh Nguyen Van  wrote:
>>>>
>>>>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could
>>>>> not get the serialized behavior by using default scheduler when there is
>>>>> failure and retry
>>>>> so I created a customized stream like this.
>>>>>
>>>>> class EachSeqRDD[T: ClassTag] (
>>>>> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>>>>>   ) extends DStream[Unit](parent.ssc) {
>>>>>
>>>>>   override def slideDuration: Duration = parent.slideDuration
>>>>>
>>>>>   override def dependencies: List[DStream[_]] = List(parent)
>>>>>
>>>>>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>>>>>
>>>>>   override private[streaming] def generateJob(time: Time): Option[Job] = {
>>>>> val pendingJobs = ssc.scheduler.getPendingTimes().size
>>>>> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
>>>>> // do not generate new RDD if there is pending job
>>>>> if (pendingJobs == 0) {
>>>>>   parent.getOrCompute(time) match {
>>>>> case Some(rdd) => {
>>>>>   val jobFunc = () => {
>>>>> ssc.sparkContext.setCallSite(creationSite)
>>>>> eachSeqFunc(rdd, time)
>>>>>   }
>>>>>   Some(new Job(time, jobFunc))
>>>>> }
>>>>> case None => None
>>>>>   }
>>>>> }
>>>>> else {
>>>>>   None
>>>>> }
>>>>>   }
>>>>> }
>>>>> object DStreamEx {
>>>>>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
>>>>> def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>>>>>   // because the DStream is reachable from the outer object here, and 
>>>>> because
>>>>>   // DStreams can't be serialized with closures, we can't proactively 
>>>>> check
>>>>>   // it for serializability and so we pass the optional false to 
>>>>> SparkContext.clean
>>>>>   new EachSeqRDD(dStream, dStream.context

Re: Serial batching with Spark Streaming

2015-06-19 Thread Michal Čizmazia
Thanks Tathagata!

I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then.

Does the default scheduler initiate the execution of the *batch X+1* after
the *batch X* even if tasks for the* batch X *need to be *retried due to
failures*? If not, please could you suggest workarounds and point me to the
code?

One more thing was not 100% clear to me from the documentation: Is there
exactly *1 RDD* published *per a batch interval* in a DStream?



On 19 June 2015 at 16:58, Tathagata Das  wrote:

> I see what is the problem. You are adding sleep in the transform
> operation. The transform function is called at the time of preparing the
> Spark jobs for a batch. It should not be running any time consuming
> operation like a RDD action or a sleep. Since this operation needs to run
> every batch interval, doing blocking long running operation messes with the
> need to run every batch interval.
>
> I will try to make this clearer in the guide. I had not seen anyone do
> something like this before and therefore it did not occur to me that this
> could happen. As long as you dont do time consuming blocking operation in
> the transform function, the batches will be generated, scheduled and
> executed in serial order by default.
>
> On Fri, Jun 19, 2015 at 11:33 AM, Michal Čizmazia 
> wrote:
>
>> Binh, thank you very much for your comment and code. Please could you
>> outline an example use of your stream? I am a newbie to Spark. Thanks again!
>>
>> On 18 June 2015 at 14:29, Binh Nguyen Van  wrote:
>>
>>> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could
>>> not get the serialized behavior by using default scheduler when there is
>>> failure and retry
>>> so I created a customized stream like this.
>>>
>>> class EachSeqRDD[T: ClassTag] (
>>> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>>>   ) extends DStream[Unit](parent.ssc) {
>>>
>>>   override def slideDuration: Duration = parent.slideDuration
>>>
>>>   override def dependencies: List[DStream[_]] = List(parent)
>>>
>>>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>>>
>>>   override private[streaming] def generateJob(time: Time): Option[Job] = {
>>> val pendingJobs = ssc.scheduler.getPendingTimes().size
>>> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
>>> // do not generate new RDD if there is pending job
>>> if (pendingJobs == 0) {
>>>   parent.getOrCompute(time) match {
>>> case Some(rdd) => {
>>>   val jobFunc = () => {
>>> ssc.sparkContext.setCallSite(creationSite)
>>> eachSeqFunc(rdd, time)
>>>   }
>>>   Some(new Job(time, jobFunc))
>>> }
>>> case None => None
>>>   }
>>> }
>>> else {
>>>   None
>>> }
>>>   }
>>> }
>>> object DStreamEx {
>>>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
>>> def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>>>   // because the DStream is reachable from the outer object here, and 
>>> because
>>>   // DStreams can't be serialized with closures, we can't proactively 
>>> check
>>>   // it for serializability and so we pass the optional false to 
>>> SparkContext.clean
>>>   new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
>>> false)).register()
>>> }
>>>   }
>>> }
>>>
>>> -Binh
>>> ​
>>>
>>> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia 
>>> wrote:
>>>
>>>> Tathagata, thanks for your response. You are right! Everything seems
>>>> to work as expected.
>>>>
>>>> Please could help me understand why the time for processing of all
>>>> jobs for a batch is always less than 4 seconds?
>>>>
>>>> Please see my playground code below.
>>>>
>>>> The last modified time of the input (lines) RDD dump files seems to
>>>> match the Thread.sleep delays (20s or 5s) in the transform operation
>>>> or the batching interval (10s): 20s, 5s, 10s.
>>>>
>>>> However, neither the batch processing time in the Streaming tab nor
>>>> the last modified time of the output (words) RDD dump files reflect
>>>> the Thread.sleep delays.
>>>>
>>>> 07:20   3240  001_lines_...
>&

Re: Serial batching with Spark Streaming

2015-06-19 Thread Michal Čizmazia
Binh, thank you very much for your comment and code. Please could you
outline an example use of your stream? I am a newbie to Spark. Thanks again!

On 18 June 2015 at 14:29, Binh Nguyen Van  wrote:

> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
> get the serialized behavior by using default scheduler when there is
> failure and retry
> so I created a customized stream like this.
>
> class EachSeqRDD[T: ClassTag] (
> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>   ) extends DStream[Unit](parent.ssc) {
>
>   override def slideDuration: Duration = parent.slideDuration
>
>   override def dependencies: List[DStream[_]] = List(parent)
>
>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>
>   override private[streaming] def generateJob(time: Time): Option[Job] = {
> val pendingJobs = ssc.scheduler.getPendingTimes().size
> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
> // do not generate new RDD if there is pending job
> if (pendingJobs == 0) {
>   parent.getOrCompute(time) match {
> case Some(rdd) => {
>   val jobFunc = () => {
> ssc.sparkContext.setCallSite(creationSite)
> eachSeqFunc(rdd, time)
>   }
>   Some(new Job(time, jobFunc))
> }
> case None => None
>   }
> }
> else {
>   None
> }
>   }
> }
> object DStreamEx {
>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
> def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>   // because the DStream is reachable from the outer object here, and 
> because
>   // DStreams can't be serialized with closures, we can't proactively 
> check
>   // it for serializability and so we pass the optional false to 
> SparkContext.clean
>   new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func, 
> false)).register()
> }
>   }
> }
>
> -Binh
> ​
>
> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia 
> wrote:
>
>> Tathagata, thanks for your response. You are right! Everything seems
>> to work as expected.
>>
>> Please could help me understand why the time for processing of all
>> jobs for a batch is always less than 4 seconds?
>>
>> Please see my playground code below.
>>
>> The last modified time of the input (lines) RDD dump files seems to
>> match the Thread.sleep delays (20s or 5s) in the transform operation
>> or the batching interval (10s): 20s, 5s, 10s.
>>
>> However, neither the batch processing time in the Streaming tab nor
>> the last modified time of the output (words) RDD dump files reflect
>> the Thread.sleep delays.
>>
>> 07:20   3240  001_lines_...
>>   07:21 117   001_words_...
>> 07:41   37224 002_lines_...
>>   07:43 252   002_words_...
>> 08:00   37728 003_lines_...
>>   08:02 504   003_words_...
>> 08:20   38952 004_lines_...
>>   08:22 756   004_words_...
>> 08:40   38664 005_lines_...
>>   08:42 999   005_words_...
>> 08:45   38160 006_lines_...
>>   08:47 1134  006_words_...
>> 08:50   9720  007_lines_...
>>   08:51 1260  007_words_...
>> 08:55   9864  008_lines_...
>>   08:56 1260  008_words_...
>> 09:00   10656 009_lines_...
>>   09:01 1395  009_words_...
>> 09:05   11664 010_lines_...
>>   09:06 1395  010_words_...
>> 09:11   10935 011_lines_...
>>   09:11 1521  011_words_...
>> 09:16   11745 012_lines_...
>>   09:16 1530  012_words_...
>> 09:21   12069 013_lines_...
>>   09:22 1656  013_words_...
>> 09:27   10692 014_lines_...
>>   09:27 1665  014_words_...
>> 09:32   10449 015_lines_...
>>   09:32 1791  015_words_...
>> 09:37   11178 016_lines_...
>>   09:37 1800  016_words_...
>> 09:45   17496 017_lines_...
>>   09:45 1926  017_words_...
>> 09:55   22032 018_lines_...
>>   09:56 2061  018_words_...
>> 10:05   21951 019_lines_...
>>   10:06 2196  019_words_...
>> 10:15   21870 020_lines_...
>>   10:16 2322  020_words_...
>> 10:25   21303 021_lines_...
>>   10:26 2340  021_words_...
>>
>>
>> final SparkConf conf = new
>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>> try (final JavaStreamingContext context = new
>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>
>> context.checkpoint("/tmp/che

Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
Tathagata,

Please could you confirm that batches are not processed in parallel
during retries in Spark 1.4? See Binh's email copied below. Any
pointers for workarounds if necessary?

Thanks!


On 18 June 2015 at 14:29, Binh Nguyen Van  wrote:
> I haven’t tried with 1.4 but I tried with 1.3 a while ago and I could not
> get the serialized behavior by using default scheduler when there is failure
> and retry
> so I created a customized stream like this.
>
> class EachSeqRDD[T: ClassTag] (
> parent: DStream[T], eachSeqFunc: (RDD[T], Time) => Unit
>   ) extends DStream[Unit](parent.ssc) {
>
>   override def slideDuration: Duration = parent.slideDuration
>
>   override def dependencies: List[DStream[_]] = List(parent)
>
>   override def compute(validTime: Time): Option[RDD[Unit]] = None
>
>   override private[streaming] def generateJob(time: Time): Option[Job] = {
> val pendingJobs = ssc.scheduler.getPendingTimes().size
> logInfo("%d job(s) is(are) pending at %s".format(pendingJobs, time))
> // do not generate new RDD if there is pending job
> if (pendingJobs == 0) {
>   parent.getOrCompute(time) match {
> case Some(rdd) => {
>   val jobFunc = () => {
> ssc.sparkContext.setCallSite(creationSite)
> eachSeqFunc(rdd, time)
>   }
>   Some(new Job(time, jobFunc))
> }
> case None => None
>   }
> }
> else {
>   None
> }
>   }
> }
>
> object DStreamEx {
>   implicit class EDStream[T: ClassTag](dStream: DStream[T]) {
> def eachSeqRDD(func: (RDD[T], Time) => Unit) = {
>   // because the DStream is reachable from the outer object here, and
> because
>   // DStreams can't be serialized with closures, we can't proactively
> check
>   // it for serializability and so we pass the optional false to
> SparkContext.clean
>   new EachSeqRDD(dStream, dStream.context.sparkContext.clean(func,
> false)).register()
> }
>   }
> }
>
> -Binh
>
>
> On Thu, Jun 18, 2015 at 10:49 AM, Michal Čizmazia  wrote:
>>
>> Tathagata, thanks for your response. You are right! Everything seems
>> to work as expected.
>>
>> Please could help me understand why the time for processing of all
>> jobs for a batch is always less than 4 seconds?
>>
>> Please see my playground code below.
>>
>> The last modified time of the input (lines) RDD dump files seems to
>> match the Thread.sleep delays (20s or 5s) in the transform operation
>> or the batching interval (10s): 20s, 5s, 10s.
>>
>> However, neither the batch processing time in the Streaming tab nor
>> the last modified time of the output (words) RDD dump files reflect
>> the Thread.sleep delays.
>>
>> 07:20   3240  001_lines_...
>>   07:21 117   001_words_...
>> 07:41   37224 002_lines_...
>>   07:43 252   002_words_...
>> 08:00   37728 003_lines_...
>>   08:02 504   003_words_...
>> 08:20   38952 004_lines_...
>>   08:22 756   004_words_...
>> 08:40   38664 005_lines_...
>>   08:42 999   005_words_...
>> 08:45   38160 006_lines_...
>>   08:47 1134  006_words_...
>> 08:50   9720  007_lines_...
>>   08:51 1260  007_words_...
>> 08:55   9864  008_lines_...
>>   08:56 1260  008_words_...
>> 09:00   10656 009_lines_...
>>   09:01 1395  009_words_...
>> 09:05   11664 010_lines_...
>>   09:06 1395  010_words_...
>> 09:11   10935 011_lines_...
>>   09:11 1521  011_words_...
>> 09:16   11745 012_lines_...
>>   09:16 1530  012_words_...
>> 09:21   12069 013_lines_...
>>   09:22 1656  013_words_...
>> 09:27   10692 014_lines_...
>>   09:27 1665  014_words_...
>> 09:32   10449 015_lines_...
>>   09:32 1791  015_words_...
>> 09:37   11178 016_lines_...
>>   09:37 1800  016_words_...
>> 09:45   17496 017_lines_...
>>   09:45 1926  017_words_...
>> 09:55   22032 018_lines_...
>>   09:56 2061  018_words_...
>> 10:05   21951 019_lines_...
>>   10:06 2196  019_words_...
>> 10:15   21870 020_lines_...
>>   10:16 2322  020_words_...
>> 10:25   21303 021_lines_...
>>   10:26 2340  021_words_...
>>
>>
>> final SparkConf conf = new
>> SparkConf().setMaster("local[4]").setAppName("WordCount");
>> try (final JavaStreamingContext context = new
>> JavaStreamingContext(conf, Durations.seconds(10))) {
>>
>> 

Re: Serial batching with Spark Streaming

2015-06-18 Thread Michal Čizmazia
Tathagata, thanks for your response. You are right! Everything seems
to work as expected.

Please could help me understand why the time for processing of all
jobs for a batch is always less than 4 seconds?

Please see my playground code below.

The last modified time of the input (lines) RDD dump files seems to
match the Thread.sleep delays (20s or 5s) in the transform operation
or the batching interval (10s): 20s, 5s, 10s.

However, neither the batch processing time in the Streaming tab nor
the last modified time of the output (words) RDD dump files reflect
the Thread.sleep delays.

07:20   3240  001_lines_...
  07:21 117   001_words_...
07:41   37224 002_lines_...
  07:43 252   002_words_...
08:00   37728 003_lines_...
  08:02 504   003_words_...
08:20   38952 004_lines_...
  08:22 756   004_words_...
08:40   38664 005_lines_...
  08:42 999   005_words_...
08:45   38160 006_lines_...
  08:47 1134  006_words_...
08:50   9720  007_lines_...
  08:51 1260  007_words_...
08:55   9864  008_lines_...
  08:56 1260  008_words_...
09:00   10656 009_lines_...
  09:01 1395  009_words_...
09:05   11664 010_lines_...
  09:06 1395  010_words_...
09:11   10935 011_lines_...
  09:11 1521  011_words_...
09:16   11745 012_lines_...
  09:16 1530  012_words_...
09:21   12069 013_lines_...
  09:22 1656  013_words_...
09:27   10692 014_lines_...
  09:27 1665  014_words_...
09:32   10449 015_lines_...
  09:32 1791  015_words_...
09:37   11178 016_lines_...
  09:37 1800  016_words_...
09:45   17496 017_lines_...
  09:45 1926  017_words_...
09:55   22032 018_lines_...
  09:56 2061  018_words_...
10:05   21951 019_lines_...
  10:06 2196  019_words_...
10:15   21870 020_lines_...
  10:16 2322  020_words_...
10:25   21303 021_lines_...
  10:26 2340  021_words_...


final SparkConf conf = new
SparkConf().setMaster("local[4]").setAppName("WordCount");
try (final JavaStreamingContext context = new
JavaStreamingContext(conf, Durations.seconds(10))) {

context.checkpoint("/tmp/checkpoint");

final JavaDStream lines = context.union(
context.receiverStream(new GeneratorReceiver()),
ImmutableList.of(
context.receiverStream(new GeneratorReceiver()),
context.receiverStream(new GeneratorReceiver(;

lines.print();

final Accumulator lineRddIndex =
context.sparkContext().accumulator(0);
lines.foreachRDD( rdd -> {
lineRddIndex.add(1);
final String prefix = "/tmp/" + String.format("%03d",
lineRddIndex.localValue()) + "_lines_";
try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
rdd.collect().forEach(s -> out.println(s));
}
return null;
});

final JavaDStream words =
lines.flatMap(x -> Arrays.asList(x.split(" ")));
final JavaPairDStream pairs =
words.mapToPair(s -> new Tuple2(s, 1));
final JavaPairDStream wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);

final Accumulator sleep = context.sparkContext().accumulator(0);
final JavaPairDStream wordCounts2 =
JavaPairDStream.fromJavaDStream(
wordCounts.transform( (rdd) -> {
sleep.add(1);
Thread.sleep(sleep.localValue() < 6 ? 2 : 5000);
return JavaRDD.fromRDD(JavaPairRDD.toRDD(rdd), rdd.classTag());
}));

final Function2, Optional,
Optional> updateFunction =
(values, state) -> {
Integer newSum = state.or(0);
for (final Integer value : values) {
newSum += value;
}
return Optional.of(newSum);
};

final List> tuples =
ImmutableList.> of();
final JavaPairRDD initialRDD =
context.sparkContext().parallelizePairs(tuples);

final JavaPairDStream wordCountsState =
wordCounts2.updateStateByKey(
 updateFunction,
 new
HashPartitioner(context.sparkContext().defaultParallelism()),
initialRDD);

wordCountsState.print();

final Accumulator rddIndex = context.sparkContext().accumulator(0);
wordCountsState.foreachRDD( rdd -> {
rddIndex.add(1);
final String prefix = "/tmp/" + String.format("%03d",
rddIndex.localValue()) + "_words_";
try (final PrintStream out = new PrintStream(prefix +
UUID.randomUUID())) {
rdd.collect().forEach(s -> out.println(s));
}
return null;
});

context.start();
context.awaitTermination();
}


On 17 June 2015 at 17:25, Tathagata Das  wrote:
> The default behavior should be that batch X + 1 starts processing only after
> batch X completes. If you are using Spark 1.4.0, could you show us a
> screenshot of the streaming tab, especially the list of

Serial batching with Spark Streaming

2015-06-17 Thread Michal Čizmazia
Is it possible to achieve serial batching with Spark Streaming?

Example:

I configure the Streaming Context for creating a batch every 3 seconds.

Processing of the batch #2 takes longer than 3 seconds and creates a
backlog of batches:

batch #1 takes 2s
batch #2 takes 10s
batch #3 takes 2s
batch #4 takes 2s

Whet testing locally, it seems that processing of multiple batches is
finished at the same time:

batch #1 finished at 2s
batch #2 finished at 12s
batch #3 finished at 12s (processed in parallel)
batch #4 finished at 15s

How can I delay processing of the next batch, so that is processed
only after processing of the previous batch has been completed?

batch #1 finished at 2s
batch #2 finished at 12s
batch #3 finished at 14s (processed serially)
batch #4 finished at 16s

I want to perform a transformation for every key only once in a given
period of time (e.g. batch duration). I find all unique keys in a
batch and perform the transformation on each key. To ensure that the
transformation is done for every key only once, only one batch can be
processed at a time. At the same time, I want that single batch to be
processed in parallel.

context = new JavaStreamingContext(conf, Durations.seconds(10));
stream = context.receiverStream(...);
stream
.reduceByKey(...)
.transform(...)
.foreachRDD(output);

Any ideas or pointers are very welcome.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Reliable SQS Receiver for Spark Streaming

2015-06-13 Thread Michal Čizmazia
Thanks Akhil!

I just looked it up in the code as well.

Receiver.store(ArrayBuffer[T], ...)
ReceiverSupervisorImpl.pushArrayBuffer(ArrayBuffer[T], ...)
ReceiverSupervisorImpl.pushAndReportBlock(...)
WriteAheadLogBasedBlockHandler.storeBlock(...)
This implementation stores the block into the block
manager as well as a write ahead log. It does this in parallel, using
Scala Futures, and returns only after the block has been stored in
both places.

https://www.codatlas.com/github.com/apache/spark/master/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala?keyword=WriteAheadLogBasedBlockHandler&line=160


On 13 June 2015 at 06:46, Akhil Das  wrote:
> Yes, if you have enabled WAL and checkpointing then after the store, you can
> simply delete the SQS Messages from your receiver.
>
> Thanks
> Best Regards
>
> On Sat, Jun 13, 2015 at 6:14 AM, Michal Čizmazia  wrote:
>>
>> I would like to have a Spark Streaming SQS Receiver which deletes SQS
>> messages only after they were successfully stored on S3.
>>
>> For this a Custom Receiver can be implemented with the semantics of
>> the Reliable Receiver.
>>
>> The store(multiple-records) call blocks until the given records have
>> been stored and replicated inside Spark.
>>
>> If the write-ahead logs are enabled, all the data received from a
>> receiver gets written into a write ahead log in the configuration
>> checkpoint directory. The checkpoint directory can be pointed to S3.
>>
>> After the store(multiple-records) blocking call finishes, are the
>> records already stored in the checkpoint directory (and thus can be
>> safely deleted from SQS)?
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Reliable SQS Receiver for Spark Streaming

2015-06-12 Thread Michal Čizmazia
I would like to have a Spark Streaming SQS Receiver which deletes SQS
messages only after they were successfully stored on S3.

For this a Custom Receiver can be implemented with the semantics of
the Reliable Receiver.

The store(multiple-records) call blocks until the given records have
been stored and replicated inside Spark.

If the write-ahead logs are enabled, all the data received from a
receiver gets written into a write ahead log in the configuration
checkpoint directory. The checkpoint directory can be pointed to S3.

After the store(multiple-records) blocking call finishes, are the
records already stored in the checkpoint directory (and thus can be
safely deleted from SQS)?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org