Re: Sessionization using updateStateByKey

2015-07-15 Thread Sean McNamara
I would just like to add that we do the very same/similar thing at Webtrends, 
updateStateByKey has been a life-saver for our sessionization use-cases.

Cheers,

Sean


On Jul 15, 2015, at 11:20 AM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:

Hi Cody,

I’ve had success using updateStateByKey for real-time sessionization by aging 
off timed-out sessions (returning None in the update function). This was on a 
large commercial website with millions of hits per day. This was over a year 
ago so I don’t have access to the stats any longer for length of sessions 
unfortunately, but I seem to remember they were around 10-30 minutes long. Even 
with peaks in volume, Spark managed to keep up very well.

Thanks,
Silvio

From: Cody Koeninger
Date: Wednesday, July 15, 2015 at 5:38 PM
To: algermissen1971
Cc: Tathagata Das, swetha, user
Subject: Re: Sessionization using updateStateByKey

An in-memory hash key data structure of some kind so that you're close to 
linear on the number of items in a batch, not the number of outstanding keys.  
That's more complex, because you have to deal with expiration for keys that 
never get hit, and for unusually long sessions you have to either drop them or 
hit durable storage.

Maybe someone has a better idea, I'd like to hear it.

On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 
algermissen1...@icloud.commailto:algermissen1...@icloud.com wrote:
Hi Cody,

oh ... I though that was one of *the* use cases for it. Do you have a 
suggestion / best practice how to achieve the same thing with better scaling 
characteristics?

Jan

On 15 Jul 2015, at 15:33, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:

 I personally would try to avoid updateStateByKey for sessionization when you 
 have long sessions / a lot of keys, because it's linear on the number of keys.

 On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
 t...@databricks.commailto:t...@databricks.com wrote:
 [Apologies for repost, for those who have seen this response already in the 
 dev mailing list]

 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
 periodically saves the state RDD (which is a snapshot of all the state data) 
 to HDFS using RDD checkpointing. In fact, a streaming app with 
 updateStateByKey will not start until you set checkpoint directory.

 2. The updateStateByKey performance is sort of independent of the what is the 
 source that is being use - receiver based or direct Kafka. The absolutely 
 performance obvious depends on a LOT of variables, size of the cluster, 
 parallelization, etc. The key things is that you must ensure sufficient 
 parallelization at every stage - receiving, shuffles (updateStateByKey 
 included), and output.

 Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk



 On Tue, Jul 14, 2015 at 4:13 PM, swetha 
 swethakasire...@gmail.commailto:swethakasire...@gmail.com wrote:

 Hi,

 I have a question regarding sessionization using updateStateByKey. If near
 real time state needs to be maintained in a Streaming application, what
 happens when the number of RDDs to maintain the state becomes very large?
 Does it automatically get saved to HDFS and reload when needed or do I have
 to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
 performance if I use both DStream Checkpointing for maintaining the state
 and use Kafka Direct approach for exactly once semantics?


 Thanks,
 Swetha



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
 Sent from the Apache Spark User List mailing list archive at 
 Nabble.comhttp://Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org








Re: Counters in Spark

2015-02-13 Thread Sean McNamara
.map is just a transformation, so no work will actually be performed until 
something takes action against it.  Try adding a .count(), like so:

inputRDD.map { x = {
 counter += 1
   } }.count()

In case it is helpful, here are the docs on what exactly the transformations 
and actions are:
http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations
http://spark.apache.org/docs/1.2.0/programming-guide.html#actions

Cheers,

Sean


On Feb 13, 2015, at 9:50 AM, nitinkak001 
nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote:

I am trying to implement counters in Spark and I guess Accumulators are the
way to do it.

My motive is to update a counter in map function and access/reset it in the
driver code. However the /println/ statement at the end still yields value
0(It should 9). Am I doing something wrong?

def main(args : Array[String]){

   val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching)
   val sc = new SparkContext(conf)
   var counter = sc.accumulable(0, Counter)
   var inputFilePath = args(0)
   val inputRDD = sc.textFile(inputFilePath)

   inputRDD.map { x = {
 counter += 1
   } }
   println(counter.value)
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.comhttp://Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Spark (SQL) as OLAP engine

2015-02-03 Thread Sean McNamara
We have gone down a similar path at Webtrends, Spark has worked amazingly well 
for us in this use case.  Our solution goes from REST, directly into spark, and 
back out to the UI instantly.

Here is the resulting product in case you are curious (and please pardon the 
self promotion): 
https://www.webtrends.com/support-training/training/explore-onboarding/


 How can I automatically cache the data once a day...

If you are not memory-bounded you could easily cache the daily results for some 
span of time and re-union them together each time you add new data.  You would 
service queries off the unioned RDD.


 ... and make them available on a web service

From the unioned RDD you could always step into spark SQL at that point.  Or 
you could use a simple scatter/gather pattern for this.  As with all things 
Spark, this is super easy to do: just use aggregate()()!


Cheers,

Sean


On Feb 3, 2015, at 9:59 AM, Adamantios Corais 
adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote:

Hi,

After some research I have decided that Spark (SQL) would be ideal for building 
an OLAP engine. My goal is to push aggregated data (to Cassandra or other 
low-latency data storage) and then be able to project the results on a web page 
(web service). New data will be added (aggregated) once a day, only. On the 
other hand, the web service must be able to run some fixed(?) queries (either 
on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I 
can already achieve similar speeds while in REPL mode by caching the data. 
Therefore, I believe that my problem must be re-phrased as follows: How can I 
automatically cache the data once a day and make them available on a web 
service that is capable of running any Spark or Spark (SQL)  statement in order 
to plot the results with D3.js?

Note that I have already some experience in Spark (+Spark SQL) as well as D3.js 
but not at all with OLAP engines (at least in their traditional form).

Any ideas or suggestions?

// Adamantios





Re: Large dataset, reduceByKey - java heap space error

2015-01-22 Thread Sean McNamara
Hi Kane-

http://spark.apache.org/docs/latest/tuning.html has excellent information that 
may be helpful.  In particular increasing the number of tasks may help, as well 
as confirming that you don’t have more data than you're expecting landing on a 
key.

Also, if you are using spark  1.2.0,  setting spark.shuffle.manager=sort was a 
huge help for many of our shuffle heavy workloads (this is the default in 1.2.0 
now)

Cheers,

Sean


On Jan 22, 2015, at 3:15 PM, Kane Kim 
kane.ist...@gmail.commailto:kane.ist...@gmail.com wrote:

I'm trying to process a large dataset, mapping/filtering works ok, but
as long as I try to reduceByKey, I get out of memory errors:

http://pastebin.com/70M5d0Bn

Any ideas how I can fix that?

Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Re: Kafka version dependency in Spark 1.2

2014-11-10 Thread Sean McNamara
 Can the 0.8.1.1 client still talk to 0.8.0 versions of Kafka

Yes it can.

0.8.1 is fully compatible with 0.8.  It is buried on this page: 
http://kafka.apache.org/documentation.html

In addition to the pom version bump SPARK-2492 would bring the kafka streaming 
receiver (which was originally based on kafka 0.7)  in line with kafka 0.8:
https://issues.apache.org/jira/browse/SPARK-2492
https://github.com/apache/spark/pull/1420

I will soon test that PR on a spark+yarn cluster

Cheers,

Sean


On Nov 10, 2014, at 11:58 AM, Matei Zaharia 
matei.zaha...@gmail.commailto:matei.zaha...@gmail.com wrote:

Just curious, what are the pros and cons of this? Can the 0.8.1.1 client still 
talk to 0.8.0 versions of Kafka, or do you need it to match your Kafka version 
exactly?

Matei

On Nov 10, 2014, at 9:48 AM, Bhaskar Dutta 
bhas...@gmail.commailto:bhas...@gmail.com wrote:

Hi,

Is there any plan to bump the Kafka version dependency in Spark 1.2 from 0.8.0 
to 0.8.1.1?

Current dependency is still on Kafka 0.8.0
https://github.com/apache/spark/blob/branch-1.2/external/kafka/pom.xml

Thanks
Bhaskie




Re: foreachPartition and task status

2014-10-14 Thread Sean McNamara
Are you using spark streaming?

On Oct 14, 2014, at 10:35 AM, Salman Haq sal...@revmetrix.com wrote:

 Hi,
 
 In my application, I am successfully using foreachPartition to write large 
 amounts of data into a Cassandra database.
 
 What is the recommended practice if the application wants to know that the 
 tasks have completed for all partitions?
 
 Thanks,
 Salman


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.id?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote:

 Hi Folks,
 
 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming. 
 
 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.
 
 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.
 
 
 Now here is the sequence of steps - 
 
 (1) I Start the the spark streaming app.
 (2) Start the producer.
 
 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I - 
 
 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.
 
 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following - 
 
 (1) Spark Streaming starts reading from each partition right from the 
 beginning.
 
 
 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off
 
 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?
 
 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?
 
 Regards,
 - Abraham


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
How long do you let the consumers run for?  Is it less than 60 seconds by 
chance?  auto.commit.interval.ms defaults to 6 (60 seconds).  If so that 
may explain why you are seeing that behavior.

Cheers,

Sean


On Oct 10, 2014, at 4:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014, at 4:31 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:

 Hi Folks,

 I am seeing some strange behavior when using the Spark Kafka connector in 
 Spark streaming.

 I have a Kafka topic which has 8 partitions. I have a kafka producer that 
 pumps some messages into this topic.

 On the consumer side I have a spark streaming application that that has 8 
 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka 
 group id connected to the 8 partitions I have for the topic. Also the kafka 
 consumer property auto.offset.reset is set to smallest.


 Now here is the sequence of steps -

 (1) I Start the the spark streaming app.
 (2) Start the producer.

 As this point I see the messages that are being pumped from the producer in 
 Spark Streaming.  Then I -

 (1) Stopped the producer
 (2) Wait for all the message to be consumed.
 (2) Stopped the spark streaming app.

 Now when I restart the spark streaming app (note - the producer is still down 
 and no messages are being pumped into the topic) - I observe the following -

 (1) Spark Streaming starts reading from each partition right from the 
 beginning.


 This is not what I was expecting. I was expecting the consumers started by 
 spark streaming to start from where it left off

 Is my assumption not correct that the consumers (the kafka/spark connector) 
 to start reading from the topic where it last left off...?

 Has anyone else seen this behavior? Is there a way to make it such that it 
 starts from where it left off?

 Regards,
 - Abraham




--
~



Re: Spark Streaming KafkaUtils Issue

2014-10-10 Thread Sean McNamara
This jira and comment sums up the issue:
https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708

Basically the offset param was renamed and had slightly different semantics 
between kafka 0.7 than 0.8.  Also it was useful because earlier versions of the 
spark streaming receiver could be overwhelmed when having a streaming job down 
for a period of time.

I think this PR quite nicely addresses the issue:
https://github.com/apache/spark/pull/1420


Best,

Sean


On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote:

Thanks Jerry, So, from what I can understand from the code, if I leave out 
auto.offset.reset, it should theoretically read from the last commit point... 
Correct?

-abe

On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai 
saisai.s...@intel.commailto:saisai.s...@intel.com wrote:
Hi Abraham,

You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different 
from original Kafka’s semantics, if you set this configure, KafkaReceiver will 
clean the related immediately, but for Kafka this configuration is just a hint 
which will be effective only when offset is out-of-range. So you will always 
read data from the beginning as you set to “smallest”, otherwise if you set to 
“largest”, you will always get data from the end immediately.

There’s a JIRA and PR to follow this, but still not merged to the master, you 
can check to see it (https://issues.apache.org/jira/browse/SPARK-2492).

Thanks
Jerry

From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com]
Sent: Saturday, October 11, 2014 6:57 AM
To: Sean McNamara
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Spark Streaming KafkaUtils Issue

Probably this is the issue -

http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/


•Spark’s usage of the Kafka consumer parameter 
auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs 
is different from Kafka’s semantics. In Kafka, the behavior of setting 
auto.offset.reset to “smallest” is that the consumer will automatically reset 
the offset to the smallest offset when a) there is no existing offset stored in 
ZooKeeper or b) there is an existing offset but it is out of range. Spark 
however will always remove existing offsets and then start all the way from 
zero again. This means whenever you restart your application with 
auto.offset.reset = smallest, your application will completely re-process all 
available Kafka data. Doh! See this 
discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland
 that discussionhttp://markmail.org/message/257a5l3oqyftsjxj.

Hmm interesting... Wondering what happens if I set it as largest...?


On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob 
abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote:
Sure... I do set the group.idhttp://group.id/ for all the consumers to be the 
same. Here is the code ---

SparkConf sparkConf = new 
SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount);
sparkConf.set(spark.shuffle.manager, SORT);
sparkConf.set(spark.streaming.unpersist, true);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new 
Duration(1000));
MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, zookeeper);
kafkaConf.put(group.idhttp://group.id/, consumerGrp);
kafkaConf.put(auto.offset.reset, smallest);
kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/,
 1000);
kafkaConf.put(rebalance.max.retries, 4);
kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000);
MapString, Integer topicMap = new HashMapString, Integer();
topicMap.put(topic, 1);
ListJavaPairDStreambyte[], String kafkaStreams = new 
ArrayListJavaPairDStreambyte[], String();
for(int i = 0; i  numPartitions; i++) {
kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class,
DefaultDecoder.class, PayloadDeSerializer.class,
kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new 
PairFunctionTuple2byte[],String, byte[], String() {

private static final long serialVersionUID = -1936810126415608167L;

public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws 
Exception {
return tuple2;
}
}
)
);
}


JavaPairDStreambyte[], String unifiedStream;
if (kafkaStreams.size()  1) {
unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, 
kafkaStreams.size()));
} else {
unifiedStream = kafkaStreams.get(0);
}
unifiedStream.print();
jssc.start();
jssc.awaitTermination();


-abe


On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara 
sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote:
Would you mind sharing the code leading to your createStream?  Are you also 
setting group.idhttp://group.id/?

Thanks,

Sean


On Oct 10, 2014

Re: balancing RDDs

2014-06-25 Thread Sean McNamara
Yep exactly!  I’m not sure how complicated it would be to pull off.  If someone 
wouldn’t mind helping to get me pointed in the right direction I would be happy 
to look into and contribute this functionality.  I imagine this would be 
implemented in the scheduler codebase and there would be some sort of rebalance 
configuration property to enable it possibly?

Does anyone else have any thoughts on this?

Cheers,

Sean


On Jun 24, 2014, at 4:41 PM, Mayur Rustagi mayur.rust...@gmail.com wrote:

 This would be really useful. Especially for Shark where shift of
 partitioning effects all subsequent queries unless task scheduling time
 beats spark.locality.wait. Can cause overall low performance for all
 subsequent tasks.
 
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi
 
 
 
 On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara sean.mcnam...@webtrends.com
 wrote:
 
 We have a use case where we’d like something to execute once on each node
 and I thought it would be good to ask here.
 
 Currently we achieve this by setting the parallelism to the number of
 nodes and use a mod partitioner:
 
 val balancedRdd = sc.parallelize(
(0 until Settings.parallelism)
.map(id = id - Settings.settings)
  ).partitionBy(new ModPartitioner(Settings.parallelism))
  .cache()
 
 
 This works great except in two instances where it can become unbalanced:
 
 1. if a worker is restarted or dies, the partition will move to a
 different node (one of the nodes will run two tasks).  When the worker
 rejoins, is there a way to have a partition move back over to the newly
 restarted worker so that it’s balanced again?
 
 2. drivers need to be started in a staggered fashion, otherwise one driver
 can launch two tasks on one set of workers, and the other driver will do
 the same with the other set.  Are there any scheduler/config semantics so
 that each driver will take one (and only one) core from *each* node?
 
 
 Thanks
 
 Sean
 
 
 
 
 
 
 



balancing RDDs

2014-06-23 Thread Sean McNamara
We have a use case where we’d like something to execute once on each node and I 
thought it would be good to ask here.

Currently we achieve this by setting the parallelism to the number of nodes and 
use a mod partitioner:

val balancedRdd = sc.parallelize(
(0 until Settings.parallelism)
.map(id = id - Settings.settings)
  ).partitionBy(new ModPartitioner(Settings.parallelism))
  .cache()


This works great except in two instances where it can become unbalanced:

1. if a worker is restarted or dies, the partition will move to a different 
node (one of the nodes will run two tasks).  When the worker rejoins, is there 
a way to have a partition move back over to the newly restarted worker so that 
it’s balanced again?

2. drivers need to be started in a staggered fashion, otherwise one driver can 
launch two tasks on one set of workers, and the other driver will do the same 
with the other set.  Are there any scheduler/config semantics so that each 
driver will take one (and only one) core from *each* node?


Thanks

Sean