Re: Why Kryo Serializer is slower than Java Serializer in TeraSort

2015-07-05 Thread Will Briggs
That code doesn't appear to be registering classes with Kryo, which means the 
fully-qualified classname is stored with every Kryo record. The Spark 
documentation has more on this: 
https://spark.apache.org/docs/latest/tuning.html#data-serialization

Regards,
Will

On July 5, 2015, at 2:31 AM, Gavin Liu  wrote:

Hi,

I am using TeraSort benchmark from ehiggs's branch 
https://github.com/ehiggs/spark-terasort
  . Then I noticed that in
TeraSort.scala, it is using Kryo Serializer. So I made a small change from
"org.apache.spark.serializer.KryoSerializer" to
"org.apache.spark.serializer.JavaSerializer" to see the time difference.

Curiously, using Java Serializer is much quicker than using Kryo and there
is no error reported when I run the program. Here is the record from history
server, first one is kryo. second one is java default. 

1.
 

2.
 

I am wondering if I did something wrong or there is any other reason behind
this result.

Thanks for any help,
Gavin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Kryo-Serializer-is-slower-than-Java-Serializer-in-TeraSort-tp23621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kryo fails to serialise output

2015-07-03 Thread Will Briggs
Kryo serialization is used internally by Spark for spilling or shuffling 
intermediate results, not for writing out an RDD as an action. Look at Sandy 
Ryza's examples for some hints on how to do this: 
https://github.com/sryza/simplesparkavroapp

Regards,
Will

On July 3, 2015, at 2:45 AM, Dominik Hübner  wrote:

I have a rather simple avro schema to serialize Tweets (message, username, 
timestamp).
Kryo and twitter chill are used to do so.

For my dev environment the Spark context is configured as below

val conf: SparkConf = new SparkConf()
conf.setAppName("kryo_test")
conf.setMaster(“local[4]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrator", "co.feeb.TweetRegistrator”)

Serialization is setup with

override def registerClasses(kryo: Kryo): Unit = {
kryo.register(classOf[Tweet], 
AvroSerializer.SpecificRecordBinarySerializer[Tweet])
}

(This method gets called)


Using this configuration to persist some object fails with 
java.io.NotSerializableException: co.feeb.avro.Tweet 
(which seems to be ok as this class is not Serializable)

I used the following code:

val ctx: SparkContext = new SparkContext(conf)
val tweets: RDD[Tweet] = ctx.parallelize(List(
new Tweet("a", "b", 1L),
new Tweet("c", "d", 2L),
new Tweet("e", "f", 3L)
  )
)

tweets.saveAsObjectFile("file:///tmp/spark”)

Using saveAsTextFile works, but persisted files are not binary but JSON

cat /tmp/spark/part-0
{"username": "a", "text": "b", "timestamp": 1}
{"username": "c", "text": "d", "timestamp": 2}
{"username": "e", "text": "f", "timestamp": 3}

Is this intended behaviour, a configuration issue, avro serialisation not 
working in local mode or something else?





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using Accumulators in Streaming

2015-06-21 Thread Will Briggs
It sounds like accumulators are not necessary in Spark Streaming - see this 
post ( 
http://apache-spark-user-list.1001560.n3.nabble.com/Shared-variable-in-Spark-Streaming-td11762.html)
 for more details.

On June 21, 2015, at 7:31 PM, anshu shukla  wrote:

In spark Streaming ,Since we are already having Streaming context ,  which does 
not allows us to have accumulators .We have to get sparkContext  for 
initializing accumulator value .

But  having 2 spark context will not serve the problem .


Please Help !!


-- 

Thanks & Regards,
Anshu Shukla



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Will Briggs
If this is research-only, and you don't want to have to worry about updating 
the jars installed by default on the cluster, you can add your custom Spark jar 
using the "spark.driver.extraLibraryPath" configuration property when running 
spark-submit, and then use the experimental " spark.driver.userClassPathFirst" 
config to force it to use yours.

See here for more details and options: 
https://spark.apache.org/docs/1.4.0/configuration.html

On June 16, 2015, at 10:12 PM, Raghav Shankar  wrote:

I made the change so that I could implement top() using treeReduce(). A member 
on here suggested I make the change in RDD.scala to accomplish that. Also, this 
is for a research project, and not for commercial use. 

So, any advice on how I can get the spark submit to use my custom built jars 
would be very useful.

Thanks,
Raghav

> On Jun 16, 2015, at 6:57 PM, Will Briggs  wrote:
> 
> In general, you should avoid making direct changes to the Spark source code. 
> If you are using Scala, you can seamlessly blend your own methods on top of 
> the base RDDs using implicit conversions.
> 
> Regards,
> Will
> 
> On June 16, 2015, at 7:53 PM, raggy  wrote:
> 
> I am trying to submit a spark application using the command line. I used the
> spark submit command for doing so. I initially setup my Spark application on
> Eclipse and have been making changes on there. I recently obtained my own
> version of the Spark source code and added a new method to RDD.scala. I
> created a new spark core jar using mvn, and I added it to my eclipse build
> path. My application ran perfectly fine. 
> 
> Now, I would like to submit it through the command line. I submitted my
> application like this:
> 
> bin/spark-submit --master local[2] --class "SimpleApp"
> /Users/XXX/Desktop/spark2.jar
> 
> The spark-submit command is within the spark project that I modified by
> adding new methods.
> When I do so, I get this error:
> 
> java.lang.NoSuchMethodError:
> org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
>   at SimpleApp$.main(SimpleApp.scala:12)
>   at SimpleApp.main(SimpleApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>   at java.lang.reflect.Method.invoke(Method.java:597)
>   at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>   at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>   at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> When I use spark submit, where does the jar come from? How do I make sure it
> uses the jars that have built? 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Spark or Storm

2015-06-16 Thread Will Briggs
The programming models for the two frameworks are conceptually rather 
different; I haven't worked with Storm for quite some time, but based on my old 
experience with it, I would equate Spark Streaming more with Storm's Trident 
API, rather than with the raw Bolt API. Even then, there are significant 
differences, but it's a bit closer.

If you can share your use case, we might be able to provide better guidance.

Regards,
Will

On June 16, 2015, at 9:46 PM, asoni.le...@gmail.com wrote:

Hi All,

I am evaluating spark VS storm ( spark streaming  ) and i am not able to see 
what is equivalent of Bolt in storm inside spark.

Any help will be appreciated on this ? 

Thanks ,
Ashish
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Submitting Spark Applications using Spark Submit

2015-06-16 Thread Will Briggs
In general, you should avoid making direct changes to the Spark source code. If 
you are using Scala, you can seamlessly blend your own methods on top of the 
base RDDs using implicit conversions.

Regards,
Will

On June 16, 2015, at 7:53 PM, raggy  wrote:

I am trying to submit a spark application using the command line. I used the
spark submit command for doing so. I initially setup my Spark application on
Eclipse and have been making changes on there. I recently obtained my own
version of the Spark source code and added a new method to RDD.scala. I
created a new spark core jar using mvn, and I added it to my eclipse build
path. My application ran perfectly fine. 

Now, I would like to submit it through the command line. I submitted my
application like this:

bin/spark-submit --master local[2] --class "SimpleApp"
/Users/XXX/Desktop/spark2.jar

The spark-submit command is within the spark project that I modified by
adding new methods.
When I do so, I get this error:

java.lang.NoSuchMethodError:
org.apache.spark.rdd.RDD.treeTop(ILscala/math/Ordering;)Ljava/lang/Object;
at SimpleApp$.main(SimpleApp.scala:12)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

When I use spark submit, where does the jar come from? How do I make sure it
uses the jars that have built? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Submitting-Spark-Applications-using-Spark-Submit-tp23352.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: creation of RDD from a Tree

2015-06-14 Thread Will Briggs
If you are working on large structures, you probably want to look at the GraphX 
extension to Spark: 
https://spark.apache.org/docs/latest/graphx-programming-guide.html

On June 14, 2015, at 10:50 AM, lisp  wrote:

Hi there,

I have a large amount of objects, which I have to partition into chunks with
the help of a binary tree: after each object has been run through the tree,
the leaves of that tree contain the chunks. Next I have to process each of
those chunks in the same way with a function f(chunk). So I thought if I
could make the list of chunks into an RDD listOfChunks, I could use Spark by
calling listOfChunks.map(f) and do the processing in parallel.

What would you recommend how I create the RDD? Is it possible to start with
an RDD that is a list of empty chunks and then to add my objects one by one
to the belonging chunks? Or would you recommend something else?

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/creation-of-RDD-from-a-Tree-tp23310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataframe Write : Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.

2015-06-13 Thread Will Briggs
The context that is created by spark-shell is actually an instance of 
HiveContext. If you want to use it programmatically in your driver, you need to 
make sure that your context is a HiveContext, and not a SQLContext.

https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

Hope this helps,
Will

On June 13, 2015, at 3:36 PM, pth001  wrote:

Hi,

I am using spark 0.14. I try to insert data into a hive table (in orc 
format) from DF.

partitionedTestDF.write.format("org.apache.spark.sql.hive.orc.DefaultSource")
.mode(org.apache.spark.sql.SaveMode.Append).partitionBy("zone","z","year","month").saveAsTable("testorc")

When this job is submitted by spark-submit I get >>
Exception in thread "main" java.lang.RuntimeException: Tables created 
with SQLContext must be TEMPORARY. Use a HiveContext instead

But the job works fine on spark-shell. What can be wrong?

BR,
Patcharee

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to split log data into different files according to severity

2015-06-13 Thread Will Briggs
Check out this recent post by Cheng Liam regarding dynamic partitioning in 
Spark 1.4: https://www.mail-archive.com/user@spark.apache.org/msg30204.html

On June 13, 2015, at 5:41 AM, Hao Wang  wrote:

Hi,


I have a bunch of large log files on Hadoop. Each line contains a log and its 
severity. Is there a way that I can use Spark to split the entire data set into 
different files on Hadoop according the severity field? Thanks. Below is an 
example of the input and output.


Input:

[ERROR] log1

[INFO] log2

[ERROR] log3

[INFO] log4


Output:

error_file

[ERROR] log1

[ERROR] log3


info_file

[INFO] log2

[INFO] log4



Best,

Hao Wang



Re: Spark distinct() returns incorrect results for some types?

2015-06-11 Thread Will Briggs
To be fair, this is a long-standing issue due to optimizations for object reuse 
in the Hadoop API, and isn't necessarily a failing in Spark - see this blog 
post 
(https://cornercases.wordpress.com/2011/08/18/hadoop-object-reuse-pitfall-all-my-reducer-values-are-the-same/)
 from 2011 documenting a similar issue.



On June 11, 2015, at 3:17 PM, Sean Owen  wrote:

Yep you need to use a transformation of the raw value; use toString for 
example. 


On Thu, Jun 11, 2015, 8:54 PM Crystal Xing  wrote:

That is a little scary. 
 So you mean in general, we shouldn't use hadoop's writable as Key in RDD? 


Zheng zheng


On Thu, Jun 11, 2015 at 6:44 PM, Sean Owen  wrote:

Guess: it has something to do with the Text object being reused by Hadoop? You 
can't in general keep around refs to them since they change. So you may have a 
bunch of copies of one object at the end that become just one in each 
partition. 


On Thu, Jun 11, 2015, 8:36 PM Crystal Xing  wrote:

I load a   list of ids from a text file as NLineInputFormat, and when I do 
distinct(), it returns incorrect number.

 JavaRDD idListData = jvc
                .hadoopFile(idList, NLineInputFormat.class,
                        LongWritable.class, Text.class).values().distinct()


I should have 7000K distinct value, how every it only returns 7000 values, 
which is the same as number of tasks.  The type I am using is 
import org.apache.hadoop.io.Text;



However,  if I switch to use String instead of Text, it works correcly. 

I think the Text class should have correct implementation of equals() and 
hashCode() functions since it is the hadoop class. 

Does anyone have clue what is going on? 

I am using spark 1.2. 

Zheng zheng






Re: write multiple outputs by key

2015-06-06 Thread Will Briggs
I believe groupByKey currently requires that all items for a specific key fit 
into a single and executive's memory: 
http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

This previous discussion has some pointers if you must use groupByKey, 
including adding a low-cardinality hash to your key: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-td11427.html

Another option I didn't see mentioned would be to persist / cache the initial 
RDD, calculate the set of distinct key values out of it, and then derive a set 
of filtered RDDs from the cached dataset, one for each key. For this to work, 
your set of unique keys would need to fit into your driver's memory.

Regards,
Will

On June 6, 2015, at 11:07 AM, patcharee  wrote:

Hi,

How can I write to multiple outputs for each key? I tried to create 
custom partitioner or define the number of partition but does not work. 
There are only the few tasks/partitions (which equals to the number of 
all key combination) gets large datasets, data is not splitting to all 
tasks/partition. The job failed as the few tasks handled too far large 
datasets. Below is my code snippet.

val varWFlatRDD = 
varWRDD.map(FlatMapUtilClass().flatKeyFromWrf).groupByKey() //key are 
(zone, z, year, month)
 .foreach(
 x => {
   val z = x._1._1
   val year = x._1._2
   val month = x._1._3
   val df_table_4dim = x._2.toList.toDF()
   df_table_4dim.registerTempTable("table_4Dim")
   hiveContext.sql("INSERT OVERWRITE table 4dim partition 
(zone=" + ZONE + ",z=" + z + ",year=" + year + ",month=" + month + ") " +
 "select date, hh, x, y, height, u, v, w, ph, phb, t, p, pb, 
qvapor, qgraup, qnice, qnrain, tke_pbl, el_pbl from table_4Dim");
   })


 From the spark history UI, at groupByKey there are > 1000 tasks (equals 
to the parent's # partitions). at foreach there are > 1000 tasks as 
well, but 50 tasks (same as the # all key combination)  gets datasets.

How can I fix this problem? Any suggestions are appreciated.

BR,
Patcharee



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext & Threading

2015-06-06 Thread Will Briggs
Hi Lee, it's actually not related to threading at all - you would still have 
the same problem even if you were using a single thread. See this section ( 
https://spark.apache.org/docs/latest/programming-guide.html#passing-functions-to-spark)
 of the Spark docs. 

On June 5, 2015, at 5:12 PM, Lee McFadden  wrote:

On Fri, Jun 5, 2015 at 2:05 PM Will Briggs  wrote:

Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


Ah, I see!  So if I broke out the lambda expressions into a method on an object 
it would prevent this issue.  Essentially, "don't use lambda expressions when 
using threads".


Thanks again, I appreciate the help. 



Re: SparkContext & Threading

2015-06-05 Thread Will Briggs
Your lambda expressions on the RDDs in the SecondRollup class are closing 
around the context, and Spark has special logic to ensure that all variables in 
a closure used on an RDD are Serializable - I hate linking to Quora, but 
there's a good explanation here: 
http://www.quora.com/What-does-Closure-cleaner-func-mean-in-Spark


On June 5, 2015, at 4:14 PM, Lee McFadden  wrote:



On Fri, Jun 5, 2015 at 12:58 PM Marcelo Vanzin  wrote:

You didn't show the error so the only thing we can do is speculate. You're 
probably sending the object that's holding the SparkContext reference over  the 
network at some point (e.g. it's used by a task run in an executor), and that's 
why you were getting that exception.


Apologies - the full error is as follows.  All I did here was remove the 
@transient annotation from the sc variable in my class constructor.  In 
addition, the full code for the classes and launching process is included below.


Error traceback:

```

Exception in thread "pool-5-thread-1" java.lang.Error: 
org.apache.spark.SparkException: Task not serializable

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1148)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Task not serializable

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)

        at org.apache.spark.rdd.RDD.map(RDD.scala:288)

        at io.icebrg.analytics.spark.SecondRollup.run(ConnTransforms.scala:33)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        ... 2 more

Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)

        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)

        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)

        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)

        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

        ... 7 more 

```


Code:

```

class SecondRollup(sc: SparkContext, connector: CassandraConnector, scanPoint: 
DateTime) extends Runnable with Serializable {


  def run {

    val conn = sc.cassandraTable("alpha_test", "sensor_readings")

      .select("data")

      .where("timep = ?", scanPoint)

      .where("sensorid IN ?", System.sensors)

      .map(r => Json.parse(r.getString("data")))

      .cache()


    conn.flatMap(AffectedIp.fromJson)

      .map(a => (AffectedIp.key(a), a))

      .reduceByKey(AffectedIp.reduce)

      .map(_._2)

      .map(a => AffectedIp.reduceWithCurrent(connector, a))

      .saveToCassandra("alpha_test", "affected_hosts")


    conn.flatMap(ServiceSummary.fromnJson)

      .map(s => (ServiceSummary.key(s), s))

      .reduceByKey(ServiceSummary.reduce)

      .map(_._2)

      .saveToCassandra("alpha_test", "service_summary_rollup")


  }

}


object Transforms {

  private val appNameBase = "Transforms%s"

  private val dtFormatter = DateTimeFormat.forPattern("MMddHH")


  def main(args: Array[String]) {

    if (args.size < 2) {

      println("""Usage: ConnTransforms  

             DateTime to start processing at. Format: MMddHH

               DateTime to end processing at.  Format: MMddHH""")

      sys.exit(1)

    }


    // withZoneRetainFields gives us a UTC time as specified on the command 
line.

    val start = 
dtFormatter.parseDateTime(args(0)).withZoneRetainFields(DateTimeZone.UTC)

    val end = 
dtFormatter.parseDateTime(args(1)).withZoneRetainFields(DateTimeZone.UTC)


    println("Processing rollups from %s to %s".format(start, end))


    // Create the spark context.

    val conf = new SparkConf()

      .setAppName(appNameBase.format("Test"))


    val connector = CassandraConnector(conf)


    val sc = new SparkContext(conf)


    // Set up the threadpool for running Jobs