Use of nscala-time within spark-shell

2015-02-16 Thread Hammam CHAMSI
Hi All,


Thanks in advance for your help. I have timestamp which I need 
to convert to datetime using scala. A folder contains the three needed 
jar files: joda-convert-1.5.jar  joda-time-2.4.jar 
 nscala-time_2.11-1.8.0.jar

Using scala REPL and adding the jars: scala -classpath *.jar

I can use nscala-time like following:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


scala DateTime.now

res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00


But when i try to use spark-shell:

ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g 
--executor-memory 2g --executor-cores 1


It successfully imports the jars:


scala import com.github.nscala_time.time.Imports._

import com.github.nscala_time.time.Imports._


scala import org.joda._

import org.joda._


but fails using them

scala DateTime.now

java.lang.NoSuchMethodError: 
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

at 
com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

at 
com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

at 
com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

at com.github.nscala_time.time.Imports$.init(Imports.scala:20)

at com.github.nscala_time.time.Imports$.clinit(Imports.scala)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)

at $iwC$$iwC$$iwC$$iwC.init(console:28)

at $iwC$$iwC$$iwC.init(console:30)

at $iwC$$iwC.init(console:32)

at $iwC.init(console:34)

at init(console:36)

at .init(console:40)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)

at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Your help is very aappreciated,


Regards,


Hammam




  

How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
Is it possible to reference a column from a SchemaRDD using the column's 
name instead of its number?


For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
caper.registerTempTable(caper)

scala caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at 
SchemaRDD.scala:108

== Query Plan ==
== Physical Plan ==
PhysicalRDD 
[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...

scala

Now I want to access fields, and of course the normal thing to do is to 
use a field name, not a field number.


scala val kv = caper.map(r = (r.ran_id, r))
console:23: error: value ran_id is not a member of 
org.apache.spark.sql.Row

   val kv = caper.map(r = (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Corey Nolet
We've been using commons configuration to pull our properties out of
properties files and system properties (prioritizing system properties over
others) and we add those properties to our spark conf explicitly and we use
ArgoPartser to get the command line argument for which property file to
load. We also implicitly added an extra parse args method to our SparkConf.
In our main method, we do something like this:

val sparkConf = SparkConfFactory.newSparkConf.parseModuleArts(args)
val sparkContext = new SparkContext(sparkConf)

Now all of our externally parsed properties are in the same spark conf so
we can pull them off anywhere in the program that has access to an
rdd/sparkcontext or the spark conf directly.

On Mon, Feb 16, 2015 at 10:42 AM, Sean Owen so...@cloudera.com wrote:

 How about system properties? or something like Typesafe Config which
 lets you at least override something in a built-in config file on the
 command line, with props or other files.

 On Mon, Feb 16, 2015 at 3:38 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Sean,
 
  I'm trying this as an alternative to what I currently do. Currently I
 have
  my module.properties file for my module in the resources directory, and
 that
  file is put inside the über JAR file when I build my application with
 Maven,
  and then when I submit it using spark-submit, I can read that
  module.properties file via the traditional method:
 
 
 
 properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties));
 
  and everything works fine. The disadvantage is that in order to make any
  changes to that .properties file effective, I have to re-build my
  application. Therefore I'm trying to find a way to be able to send that
  module.properties file via spark-submit and read the values in iy, so
 that I
  will not be forced to build my application every time I want to make a
  change in the module.properties file.
 
  I've also checked the --files option of spark-submit, but I see that
 it is
  for sending the listed files to executors (correct me if I'm wrong), what
  I'm after is being able to pass dynamic properties (key/value pairs) to
 the
  Driver program of my Spark application. And I still could not find out
 how
  to do that.
 
  --
  Emre
 
 
 
 
 
  On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote:
 
  Since SparkConf is only for Spark properties, I think it will in
  general only pay attention to and preserve spark.* properties. You
  could experiment with that. In general I wouldn't rely on Spark
  mechanisms for your configuration, and you can use any config
  mechanism you like to retain your own properties.
 
  On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com
  wrote:
   Hello,
  
   I'm using Spark 1.2.1 and have a module.properties file, and in it I
   have
   non-Spark properties, as well as Spark properties, e.g.:
  
  job.output.dir=file:///home/emre/data/mymodule/out
  
   I'm trying to pass it to spark-submit via:
  
  spark-submit --class com.myModule --master local[4] --deploy-mode
   client
   --verbose --properties-file /home/emre/data/mymodule.properties
   mymodule.jar
  
   And I thought I could read the value of my non-Spark property, namely,
   job.output.dir by using:
  
   SparkConf sparkConf = new SparkConf();
   final String validatedJSONoutputDir =
   sparkConf.get(job.output.dir);
  
   But it gives me an exception:
  
   Exception in thread main java.util.NoSuchElementException:
   job.output.dir
  
   Is it not possible to mix Spark and non-Spark properties in a single
   .properties file, then pass it via --properties-file and then get the
   values
   of those non-Spark properties via SparkConf?
  
   Or is there another object / method to retrieve the values for those
   non-Spark properties?
  
  
   --
   Emre Sevinç
 
 
 
 
  --
  Emre Sevinc

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Eric Bell
I am just learning scala so I don't actually understand what your code 
snippet is doing but thank you, I will learn more so I can figure it out.


I am new to all of this and still trying to make the mental shift from 
normal programming to distributed programming, but it seems to me that 
the row object would know its own schema object that it came from and be 
able to ask its schema to transform a name to a column number. Am I 
missing something or is this just a matter of time constraints and this 
one just hasn't gotten into the queue yet?


Baring that, do the schema classes provide methods for doing this? I've 
looked and didn't see anything.


I've just discovered that the python implementation for SchemaRDD does 
in fact allow for referencing by name and column. Why is this provided 
in the python implementation but not scala or java implementations?


Thanks,

--eric


On 02/16/2015 10:46 AM, Michael Armbrust wrote:
For efficiency the row objects don't contain the schema so you can't 
get the column by name directly.  I usually do a select followed by 
pattern matching. Something like the following:


caper.select('ran_id).map { case Row(ranId: String) = }

On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com 
mailto:e...@ericjbell.com wrote:


Is it possible to reference a column from a SchemaRDD using the
column's name instead of its number?

For example, let's say I've created a SchemaRDD from an avro file:

val sqlContext = new SQLContext(sc)
import sqlContext._
val
caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
caper.registerTempTable(caper)

scala caper
res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
SchemaRDD.scala:108
== Query Plan ==
== Physical Plan ==
PhysicalRDD

[ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
scala

Now I want to access fields, and of course the normal thing to do
is to use a field name, not a field number.

scala val kv = caper.map(r = (r.ran_id, r))
console:23: error: value ran_id is not a member of
org.apache.spark.sql.Row
   val kv = caper.map(r = (r.ran_id, r))

How do I do this?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org






Re: Array in broadcast can't be serialized

2015-02-16 Thread Ted Yu
Is it possible to port WrappedArraySerializer.scala to your app ?

Pardon me for not knowing how to integrate Chill with Spark.

Cheers

On Mon, Feb 16, 2015 at 12:31 AM, Tao Xiao xiaotao.cs@gmail.com wrote:

 Thanks Ted

 After searching for a whole day, I still don't know how to let spark use
 twitter chill serialization - there are very few documents about how to
 integrate twitter chill into Spark for serialization. I tried the
 following, but an exception of java.lang.ClassCastException:
 com.twitter.chill.WrappedArraySerializer cannot be cast to
 org.apache.spark.serializer.Serializer was thrown:

 val conf = new SparkConf()
.setAppName(Test Serialization)
.set(spark.serializer,
 com.twitter.chill.WrappedArraySerializer)


 Well, what is the correct way of configuring Spark to use the twitter
 chill serialization framework ?







 2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 I was looking at https://github.com/twitter/chill

 It seems this would achieve what you want:
 chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

 Cheers

 On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
 serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
 serialized even when I registered both of them in Kryo.

 The code is as follows:

val conf = new SparkConf()
 .setAppName(Hello Spark)
 .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.kryo.registrator, xt.MyKryoRegistrator)

 val sc = new SparkContext(conf)

 val rdd = sc.parallelize(List(
 (new ImmutableBytesWritable(Bytes.toBytes(AAA)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(BBB)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(CCC)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(DDD)),
 new KeyValue())), 4)

 // snippet 1:  a single object of *ImmutableBytesWritable* can
 be serialized in broadcast
 val partitioner = new SingleElementPartitioner(sc.broadcast(new
 ImmutableBytesWritable(Bytes.toBytes(3
 val ret = rdd.aggregateByKey(List[KeyValue](),
 partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
 println(\n\n\ret.count =  + ret.count + ,  partition size = 
 + ret.partitions.size)

 // snippet 2: an array of *ImmutableBytesWritable* can not be
 serialized in broadcast
 val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)),
 new ImmutableBytesWritable(Bytes.toBytes(2)), new
 ImmutableBytesWritable(Bytes.toBytes(3)))
 val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
 val ret1 = rdd.aggregateByKey(List[KeyValue](),
 newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
 println(\n\n\nrdd2.count =  + ret1.count)

 sc.stop


   // the following are kryo registrator and partitioners
class MyKryoRegistrator extends KryoRegistrator {
 override def registerClasses(kryo: Kryo): Unit = {
  kryo.register(classOf[ImmutableBytesWritable])   //
 register ImmutableBytesWritable
  kryo.register(classOf[Array[ImmutableBytesWritable]])
  // register Array[ImmutableBytesWritable]
 }
}

class SingleElementPartitioner(bc:
 Broadcast[ImmutableBytesWritable]) extends Partitioner {
 override def numPartitions: Int = 5
 def v = Bytes.toInt(bc.value.get)
 override def getPartition(key: Any): Int =  v - 1
}


 class ArrayPartitioner(bc:
 Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
 val arr = bc.value
 override def numPartitions: Int = arr.length
 override def getPartition(key: Any): Int =
 Bytes.toInt(arr(0).get)
 }



 In the code above, snippet 1 can work as expected. But snippet 2 throws
 Task not serializable: java.io.NotSerializableException:
 org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


 So do I have to implement a Kryo serializer for Array[T] if it is used
 in broadcast ?

 Thanks









Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
I'm a spark newbie working on his first attempt to do write an ETL 
program. I could use some feedback to make sure I'm on the right path. 
I've written a basic proof of concept that runs without errors and seems 
to work, although I might be missing some issues when this is actually 
run on more than a single node.


I am working with data about people (actually healthcare patients). I 
have an RDD that contains multiple rows per person. My overall goal is 
to create a single Person object for each person in my data. In this 
example, I am serializing to JSON, mostly because this is what I know 
how to do at the moment.


Other than general feedback, is my use of the groupByKey() and 
mapValues() methods appropriate?


Thanks!


import json

class Person:
def __init__(self):
self.mydata={}
self.cpts = []
self.mydata['cpt']=self.cpts
def addRowData(self, dataRow):
# Get the CPT codes
cpt = dataRow.CPT_1
if cpt:
self.cpts.append(cpt)
def serializeToJSON(self):
return json.dumps(self.mydata)

def makeAPerson(rows):
person = Person()
for row in rows:
person.addRowData(row)
return person.serializeToJSON()

peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: 
makeAPerson(personDataRows))

peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people)


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Hello all,

Trying the example code from this package (
https://github.com/Parsely/pyspark-cassandra) , I always get this error...

Can you see what I am doing wrong? from googling arounf it seems to be that
the jar is not found somehow...  The spark log shows the JAR was processed
at least.

Thank you so much.

am using spark-1.2.1-bin-hadoop2.4.tgz

test2.py is simply:

from pyspark.context import SparkConf
from pyspark_cassandra import CassandraSparkContext, saveToCassandra
conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
conf.set(spark.cassandra.connection.host, devzero)
sc = CassandraSparkContext(conf=conf)

[root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
-c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
--jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
...
15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
15/02/16 05:58:45 INFO Remoting: Starting remoting
15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
addresses :[akka.tcp://sparkDriver@devzero:38917]
15/02/16 05:58:45 INFO Utils: Successfully started service
'sparkDriver' on port 38917.
15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
/tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
server' on port 56642.
15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI'
on port 4040.
15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
15/02/16 05:58:46 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424066326632
15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
/tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py
at http://10.212.55.42:56642/files/test2.py with timestamp
1424066326633
15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
/tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
15/02/16 05:58:46 INFO SparkContext: Added file
file:/spark/pyspark_cassandra.py at
http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
1424066326642
15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost
15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager
15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block
manager localhost:32895 with 265.4 MB RAM, BlockManagerId(driver,
localhost, 32895)
15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040
15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
15/02/16 05:58:48 INFO MapOutputTrackerMasterActor:
MapOutputTrackerActor stopped!
15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped
15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.
15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.
Traceback (most recent call last):
  File /spark/test2.py, line 5, in module
sc = CassandraSparkContext(conf=conf)
  File /spark/python/pyspark/context.py, line 105, in __init__
conf, jsc)
  File /spark/pyspark_cassandra.py, line 17, in _do_init
self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
  File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.


Re: Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)

2015-02-16 Thread Michael Armbrust
You probably want to mark the HiveContext as @transient as its not valid to
use it on the slaves anyway.

On Mon, Feb 16, 2015 at 1:58 AM, Haopu Wang hw...@qilinsoft.com wrote:

  I have a streaming application which registered temp table on a
 HiveContext for each batch duration.

 The application runs well in Spark 1.1.0. But I get below error from 1.1.1.

 Do you have any suggestions to resolve it? Thank you!



 *java.io.NotSerializableException*: org.apache.hadoop.hive.conf.HiveConf

 - field (class scala.Tuple2, name: _1, type: class
 java.lang.Object)

 - object (class scala.Tuple2, (Configuration: core-default.xml,
 core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
 yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
 org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23
 ,org.apache.hadoop.hive.ql.session.SessionState@49b6eef9))

 - field (class org.apache.spark.sql.hive.HiveContext, name: x$3,
 type: class scala.Tuple2)

 - object (class org.apache.spark.sql.hive.HiveContext,
 org.apache.spark.sql.hive.HiveContext@4e6e66a4)

 - field (class
 com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2,
 name: sqlContext$1, type: class org.apache.spark.sql.SQLContext)

- object (class
 com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2,
 function1)

 - field (class
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1, name:
 foreachFunc$1, type: interface scala.Function1)

 - object (class
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1,
 function2)

 - field (class org.apache.spark.streaming.dstream.ForEachDStream,
 name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc,
 type: interface scala.Function2)

 - object (class org.apache.spark.streaming.dstream.ForEachDStream,
 org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)

 - element of array (index: 0)

 - array (class [Ljava.lang.Object;, size: 16)

 - field (class scala.collection.mutable.ArrayBuffer, name: array,
 type: class [Ljava.lang.Object;)

 - object (class scala.collection.mutable.ArrayBuffer,
 ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))

 - field (class org.apache.spark.streaming.DStreamGraph, name:
 outputStreams, type: class scala.collection.mutable.ArrayBuffer)

 - custom writeObject data (class
 org.apache.spark.streaming.DStreamGraph)

 - object (class org.apache.spark.streaming.DStreamGraph,
 org.apache.spark.streaming.DStreamGraph@776ae7da)

 - field (class org.apache.spark.streaming.Checkpoint, name: graph,
 type: class org.apache.spark.streaming.DStreamGraph)

 - root object (class org.apache.spark.streaming.Checkpoint,
 org.apache.spark.streaming.Checkpoint@5eade065)

 at java.io.ObjectOutputStream.writeObject0(Unknown Source)





Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Tathagata Das
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do
automatic cleanup of files based on which RDDs are used/garbage collected
by JVM. That would be the best way, but depends on the JVM GC
characteristics. If you force a GC periodically in the driver that might
help you get rid of files in the workers that are not needed.

TD

On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid
wrote:

 spark.cleaner.ttl is not the right way - seems to be really designed for
 streaming. although it keeps the disk usage under control it also causes
 loss of rdds and broadcasts that are required later leading to crash.

 is there any other way?
 thanks,
 Antony.


   On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com
 wrote:



 spark.cleaner.ttl ?


   On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com
 wrote:



 Hi,

 I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using
 about 3 billions of ratings and I am doing several trainImplicit() runs in
 loop within one spark session. I have four node cluster with 3TB disk space
 on each. before starting the job there is less then 8% of the disk space
 used. while the ALS is running I can see the disk usage rapidly growing
 mainly because of files being stored
 under 
 yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA.
 after about 10 hours the disk usage hits 90% and yarn kills the particular
 containers.

 am I missing doing some cleanup somewhere while looping over the several
 trainImplicit() calls? taking 4*3TB of disk space seems immense.

 thanks for any help,
 Antony.








Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Michael Armbrust
For efficiency the row objects don't contain the schema so you can't get
the column by name directly.  I usually do a select followed by pattern
matching. Something like the following:

caper.select('ran_id).map { case Row(ranId: String) = }

On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote:

 Is it possible to reference a column from a SchemaRDD using the column's
 name instead of its number?

 For example, let's say I've created a SchemaRDD from an avro file:

 val sqlContext = new SQLContext(sc)
 import sqlContext._
 val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
 caper.registerTempTable(caper)

 scala caper
 res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
 SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,
 APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#
 11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,
 CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,
 CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,
 CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,
 CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#
 38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,
 CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,
 CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,
 CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,
 CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
 scala

 Now I want to access fields, and of course the normal thing to do is to
 use a field name, not a field number.

 scala val kv = caper.map(r = (r.ran_id, r))
 console:23: error: value ran_id is not a member of
 org.apache.spark.sql.Row
val kv = caper.map(r = (r.ran_id, r))

 How do I do this?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Michael Armbrust
I can unpack the code snippet a bit:

caper.select('ran_id) is the same as saying SELECT ran_id FROM table in
SQL.  Its always a good idea to explicitly request the columns you need
right before using them.  That way you are tolerant of any changes to the
schema that might happen upstream.

The next part .map { case Row(ranId: String) = ... } is doing an
extraction to pull out the values of the row into typed variables.  This is
the same as doing .map(row = row(0).asInstanceOf[String]) or .map(row =
row.getString(0)), but I find this syntax easier to read since it lines up
nicely with the select clause that comes right before it.  It's also less
verbose especially when pulling out a bunch of columns.

Regarding the differences between python and java/scala, part of this is
just due to the nature of these language.  Since java/scala are statically
typed, you will always have to explicitly say the type of the column you
are extracting (the bonus here is they are much faster than python due to
optimizations this strictness allows).  However, since its already a little
more verbose, we decided not to have the more expensive ability to look up
columns in a row by name, and instead go with a faster ordinal based API.
We could revisit this, but its not currently something we are planning to
change.

Michael

On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell e...@ericjbell.com wrote:

  I am just learning scala so I don't actually understand what your code
 snippet is doing but thank you, I will learn more so I can figure it out.

 I am new to all of this and still trying to make the mental shift from
 normal programming to distributed programming, but it seems to me that the
 row object would know its own schema object that it came from and be able
 to ask its schema to transform a name to a column number. Am I missing
 something or is this just a matter of time constraints and this one just
 hasn't gotten into the queue yet?

 Baring that, do the schema classes provide methods for doing this? I've
 looked and didn't see anything.

 I've just discovered that the python implementation for SchemaRDD does in
 fact allow for referencing by name and column. Why is this provided in the
 python implementation but not scala or java implementations?

 Thanks,

 --eric



 On 02/16/2015 10:46 AM, Michael Armbrust wrote:

 For efficiency the row objects don't contain the schema so you can't get
 the column by name directly.  I usually do a select followed by pattern
 matching. Something like the following:

  caper.select('ran_id).map { case Row(ranId: String) = }

 On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote:

 Is it possible to reference a column from a SchemaRDD using the column's
 name instead of its number?

 For example, let's say I've created a SchemaRDD from an avro file:

 val sqlContext = new SQLContext(sc)
 import sqlContext._
 val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
 caper.registerTempTable(caper)

 scala caper
 res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
 SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 PhysicalRDD
 [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
 scala

 Now I want to access fields, and of course the normal thing to do is to
 use a field name, not a field number.

 scala val kv = caper.map(r = (r.ran_id, r))
 console:23: error: value ran_id is not a member of
 org.apache.spark.sql.Row
val kv = caper.map(r = (r.ran_id, r))

 How do I do this?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Yes, am sure the system cant find the jar.. but how do I fix that... my
submit command includes the jar:

/spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
/spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py

and the spark output seems to indicate it is handling it:

15/02/16 05:58:46 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424066326632


I don't really know what else I could try any suggestions highly
appreciated.

Thanks,
Mohamed.


On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote:

 It seems that the jar for cassandra is not loaded, you should have
 them in the classpath.

 On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Hello all,
 
  Trying the example code from this package
  (https://github.com/Parsely/pyspark-cassandra) , I always get this
 error...
 
  Can you see what I am doing wrong? from googling arounf it seems to be
 that
  the jar is not found somehow...  The spark log shows the JAR was
 processed
  at least.
 
  Thank you so much.
 
  am using spark-1.2.1-bin-hadoop2.4.tgz
 
  test2.py is simply:
 
  from pyspark.context import SparkConf
  from pyspark_cassandra import CassandraSparkContext, saveToCassandra
  conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
  conf.set(spark.cassandra.connection.host, devzero)
  sc = CassandraSparkContext(conf=conf)
 
  [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  ...
  15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
  15/02/16 05:58:45 INFO Remoting: Starting remoting
  15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
  :[akka.tcp://sparkDriver@devzero:38917]
  15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
 on
  port 38917.
  15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
  15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
  15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
  15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
 265.4
  MB
  15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
  15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
  15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
  server' on port 56642.
  15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
 port
  4040.
  15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
  15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
  15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
  15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
  15/02/16 05:58:46 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
  1424066326642
  15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
  localhost
  15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
  akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
  15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
  15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
 BlockManager
  15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
  localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
  32895)
  15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
  15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at
 http://devzero:4040
  15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
  15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
  stopped!
  15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
  15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
  

Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
I cannot comment about the correctness of Python code. I will assume your
caper_kv is keyed on something that uniquely identifies all the rows that
make up the person's record so your group by key makes sense, as does the
map. (I will also assume all of the rows that comprise a single person's
record will always fit in memory. If not you will need another approach.)

You should be able to get away with removing the localhost:9000 from your
HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS
configuration for Spark supply the missing pieces.

On Mon Feb 16 2015 at 3:38:31 PM Eric Bell e...@ericjbell.com wrote:

 I'm a spark newbie working on his first attempt to do write an ETL
 program. I could use some feedback to make sure I'm on the right path.
 I've written a basic proof of concept that runs without errors and seems
 to work, although I might be missing some issues when this is actually
 run on more than a single node.

 I am working with data about people (actually healthcare patients). I
 have an RDD that contains multiple rows per person. My overall goal is
 to create a single Person object for each person in my data. In this
 example, I am serializing to JSON, mostly because this is what I know
 how to do at the moment.

 Other than general feedback, is my use of the groupByKey() and
 mapValues() methods appropriate?

 Thanks!


 import json

 class Person:
  def __init__(self):
  self.mydata={}
  self.cpts = []
  self.mydata['cpt']=self.cpts
  def addRowData(self, dataRow):
  # Get the CPT codes
  cpt = dataRow.CPT_1
  if cpt:
  self.cpts.append(cpt)
  def serializeToJSON(self):
  return json.dumps(self.mydata)

 def makeAPerson(rows):
  person = Person()
  for row in rows:
  person.addRowData(row)
  return person.serializeToJSON()

 peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows:
 makeAPerson(personDataRows))
 peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people)


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark newbie desires feedback on first program

2015-02-16 Thread Charles Feduke
My first problem was somewhat similar to yours. You won't find a whole lot
of JDBC to Spark examples since I think a lot of the adoption for Spark is
from teams already experienced with Hadoop and already have an established
big data solution (so their data is already extracted from whatever
sources, e.g., log files, Hive, other M/R jobs). JDBC support is
somewhat... lacking.

Our application uses a 12 node PostgreSQL distributed RDBMS that is sharded
at the application tier. I had to write my own JDBC RDD to support this
logical schema. However because you are coming from a single MySQL DB you
should be able to get away with using the JdbcRDD[1]... but I cannot find a
reference to it for the Python API so someone familiar with using Python
and Spark will have to chime in on that.

You need to consider _how_ the data gets from MySQL to the workers. It
might work to pull all of the data to a single node and then parallelize
that data across the cluster but its not going to be as efficient as range
querying from each worker in the cluster to the database. If you're working
with TBs of data then you will see very big benefits by distributing the
data across workers from the get go; if you don't it will take however long
it takes to copy all the data to a single worker and distribute as your
startup code for each execution. (By range querying what I mean is
basically what the JdbcRDD does - it forces you to include a conditional
statement like id  ? AND id = ? in your SQL which it formats at each
worker so each worker only gets a piece of the pie). The JdbcRDD makes
assumptions about numeric keys for range querying.

The next thing to consider is if you're going against your production
database, will massive reads cause degradation for production users? I am
using read replicas to mitigate this for our production installation, as
copying TBs of data out of PostgreSQL would have some negative effect on
our users. Running your jobs during low traffic is obviously an option
here, as is restoring a read-only version from backup and explicitly
querying that instance (in which case parallelizing user IDs and querying
MySQL directly might get you near to the JdbcRDD behavior). And of course
if the MySQL instance is already your analytics solution then query on.

1.
https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/rdd/JdbcRDD.html

On Mon Feb 16 2015 at 4:42:30 PM Eric Bell e...@ericjbell.com wrote:

 Thanks Charles. I just realized a few minutes ago that I neglected to
 show the step where I generated the key on the person ID. Thanks for the
 pointer on the HDFS URL.

 Next step is to process data from multiple RDDS. My data originates from
 7 tables in a MySQL database. I used sqoop to create avro files from
 these tables, and in turn created RDDs using SparkSQL from the avro
 files. Since the groupByKey only operates on a single RDD, I'm not quite
 sure yet how I'm going to process 7 tables as a transformation to get
 all the data I need into my objects.

 I'm vascillating on whether I should be doing it this way, or if it
 would be a lot simpler to query MySQL to get all the Person IDs,
 parallelize them, and have my Person class make queries directly to the
 MySQL database. Since in theory I only have to do this once, I'm not
 sure there's much to be gained in moving the data from MySQL to Spark
 first.

 I have yet to find any non-trivial examples of ETL logic on the web ...
 it seems like it's mostly word count map-reduce replacements.

 On 02/16/2015 01:32 PM, Charles Feduke wrote:
  I cannot comment about the correctness of Python code. I will assume
  your caper_kv is keyed on something that uniquely identifies all the
  rows that make up the person's record so your group by key makes
  sense, as does the map. (I will also assume all of the rows that
  comprise a single person's record will always fit in memory. If not
  you will need another approach.)
 
  You should be able to get away with removing the localhost:9000 from
  your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your
  HDFS configuration for Spark supply the missing pieces.
 




Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil

Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 Yes, am sure the system cant find the jar.. but how do I fix that... my
 submit command includes the jar:

 /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py

 and the spark output seems to indicate it is handling it:

 15/02/16 05:58:46 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424066326632


 I don't really know what else I could try any suggestions highly
 appreciated.

 Thanks,
 Mohamed.


 On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote:

 It seems that the jar for cassandra is not loaded, you should have
 them in the classpath.

 On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Hello all,
 
  Trying the example code from this package
  (https://github.com/Parsely/pyspark-cassandra) , I always get this
  error...
 
  Can you see what I am doing wrong? from googling arounf it seems to be
  that
  the jar is not found somehow...  The spark log shows the JAR was
  processed
  at least.
 
  Thank you so much.
 
  am using spark-1.2.1-bin-hadoop2.4.tgz
 
  test2.py is simply:
 
  from pyspark.context import SparkConf
  from pyspark_cassandra import CassandraSparkContext, saveToCassandra
  conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
  conf.set(spark.cassandra.connection.host, devzero)
  sc = CassandraSparkContext(conf=conf)
 
  [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  ...
  15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
  15/02/16 05:58:45 INFO Remoting: Starting remoting
  15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
  addresses
  :[akka.tcp://sparkDriver@devzero:38917]
  15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver'
  on
  port 38917.
  15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
  15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
  15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 
  /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
  15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
  265.4
  MB
  15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
  library for your platform... using builtin-java classes where applicable
  15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 
  /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
  15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
  15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
  server' on port 56642.
  15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on
  port
  4040.
  15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
  15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 
  /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
  15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
  15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
  /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
  15/02/16 05:58:46 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
  1424066326642
  15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
  localhost
  15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
  akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
  15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on
  32895
  15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
  BlockManager
  15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block
  manager
  localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
  32895)
  15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
  15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at
  http://devzero:4040
  15/02/16 

Re: Spark newbie desires feedback on first program

2015-02-16 Thread Eric Bell
Thanks Charles. I just realized a few minutes ago that I neglected to 
show the step where I generated the key on the person ID. Thanks for the 
pointer on the HDFS URL.


Next step is to process data from multiple RDDS. My data originates from 
7 tables in a MySQL database. I used sqoop to create avro files from 
these tables, and in turn created RDDs using SparkSQL from the avro 
files. Since the groupByKey only operates on a single RDD, I'm not quite 
sure yet how I'm going to process 7 tables as a transformation to get 
all the data I need into my objects.


I'm vascillating on whether I should be doing it this way, or if it 
would be a lot simpler to query MySQL to get all the Person IDs, 
parallelize them, and have my Person class make queries directly to the 
MySQL database. Since in theory I only have to do this once, I'm not 
sure there's much to be gained in moving the data from MySQL to Spark first.


I have yet to find any non-trivial examples of ETL logic on the web ... 
it seems like it's mostly word count map-reduce replacements.


On 02/16/2015 01:32 PM, Charles Feduke wrote:
I cannot comment about the correctness of Python code. I will assume 
your caper_kv is keyed on something that uniquely identifies all the 
rows that make up the person's record so your group by key makes 
sense, as does the map. (I will also assume all of the rows that 
comprise a single person's record will always fit in memory. If not 
you will need another approach.)


You should be able to get away with removing the localhost:9000 from 
your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your 
HDFS configuration for Spark supply the missing pieces.





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Oh, I don't know. thanks a lot Davies, gonna figure that out now

On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
 -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
   ...
   15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
   15/02/16 05:58:45 INFO Remoting: Starting remoting
   15/02/16 05:58:45 INFO Remoting: Remoting started; listening on
   addresses
   :[akka.tcp://sparkDriver@devzero:38917]
   15/02/16 05:58:45 INFO Utils: Successfully started service
 'sparkDriver'
   on
   port 38917.
   15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
   15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
   15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
  
  
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
   15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity
   265.4
   MB
   15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
   library for your platform... using builtin-java classes where
 applicable
   15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
  
  
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
   15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
   15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
   server' on port 56642.
   15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI'
 on
   port
   4040.
   15/02/16 05:58:46 INFO SparkUI: Started SparkUI at
 http://devzero:4040
   15/02/16 05:58:46 INFO SparkContext: Added JAR
   file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
   http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
 with
   timestamp 1424066326632
   15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
  
  
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
   15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py
 at
   http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
   15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
  
  
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
   15/02/16 05:58:46 INFO SparkContext: Added file
   file:/spark/pyspark_cassandra.py at
   http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
   1424066326642
   15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
   localhost
   15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
   akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
   15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on
   32895
   15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register
   BlockManager
   15/02/16 05:58:46 INFO 

OOM error

2015-02-16 Thread Harshvardhan Chauhan
Hi All,


I need some help with Out Of Memory errors in my application. I am using
Spark 1.1.0 and my application is using Java API. I am running my app on
EC2  25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails
sometimes. Lots of mapToPair tasks a failing.  My app is configured to run
120 executors and executor memory is 2G.

These are various errors i see the in my logs.

15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size
4680 dropped from memory (free 257277829)
15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0x6e0138a3, /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0x2d0c1db1, /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
at 
org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
at 
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
at 
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception
was thrown by a user handler while handling an exception event ([id:
0xd4211985, /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION:
java.lang.OutOfMemoryError: Java heap space)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
at 
org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
at 
org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
at 
org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
at 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
It seems that the jar for cassandra is not loaded, you should have
them in the classpath.

On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 Hello all,

 Trying the example code from this package
 (https://github.com/Parsely/pyspark-cassandra) , I always get this error...

 Can you see what I am doing wrong? from googling arounf it seems to be that
 the jar is not found somehow...  The spark log shows the JAR was processed
 at least.

 Thank you so much.

 am using spark-1.2.1-bin-hadoop2.4.tgz

 test2.py is simply:

 from pyspark.context import SparkConf
 from pyspark_cassandra import CassandraSparkContext, saveToCassandra
 conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
 conf.set(spark.cassandra.connection.host, devzero)
 sc = CassandraSparkContext(conf=conf)

 [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash -c
 /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
 /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 ...
 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
 15/02/16 05:58:45 INFO Remoting: Starting remoting
 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses
 :[akka.tcp://sparkDriver@devzero:38917]
 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on
 port 38917.
 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker
 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster
 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at
 /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193
 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4
 MB
 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is
 /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647
 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server
 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file
 server' on port 56642.
 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port
 4040.
 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040
 15/02/16 05:58:46 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424066326632
 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py
 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at
 http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633
 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to
 /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py
 15/02/16 05:58:46 INFO SparkContext: Added file
 file:/spark/pyspark_cassandra.py at
 http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp
 1424066326642
 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host
 localhost
 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver:
 akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver
 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895
 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager
 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager
 localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost,
 32895)
 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager
 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040
 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler
 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor
 stopped!
 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared
 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
 15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped
 15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
 down remote daemon.
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote
 daemon shut down; proceeding with flushing remote transports.
 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
 shut down.
 Traceback (most recent call last):
   File /spark/test2.py, line 5, in module
 sc = CassandraSparkContext(conf=conf)
   File /spark/python/pyspark/context.py, line 105, in __init__
 conf, jsc)
   File /spark/pyspark_cassandra.py, line 17, in _do_init
 self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
   File 

Unable to broadcast dimension tables with Spark SQL

2015-02-16 Thread Sunita Arvind
Hi Experts,

I have a large table with 54 million records (fact table), being joined
with 6 small tables (dimension tables). The size on disk of small tables is
within 5k and the record count is in the range of 4 - 200
All the worker nodes have RAM of 32GB allocated for spark. I have tried the
below approaches and looks like the small tables are not being broadcast,
which is causing timeouts as expected and failure of the job.
The reason for this, AFAIK is, the small table is also getting shuffled and
is fitting into a single node's partition. Then the large table is made to
flow to the same node which stays busy while all other nodes are idle.

Note: The spark version in use on cluster as well as my local setup is
1.1.0. I also tried with Spark 1.2.0 in the local setup, however the
queryPlan showed no change.

1. Broadcast the RDD before registering as table:
 val k = sqlContext.parquetFile(p.fileName)
 val t = sc.broadcast(k)
 t.value.registerTempTable(p.tableName)

2. Set the variable
 sqlContext.setConf(spark.sql.autoBroadcastJoinThreshold,1)


3. Added limit to each small table before registering as table. I guess
this gives optimizer a way compute statistics and determine that the other
table is small enough for broadcast:
   sqlContext.sql(select * from a_nolim limit 7).registerTempTable(edu)

  also tried DSL style:

 a.limit(7).registerTempTable(edu)

   Tried explicit broadcasting of the tables as below:

   sc.broadcast(sqlContext.sql(select * from edu_nolim limit
7)).value.registerTempTable(edu)

   and tried dsl style with broadcast done on the rdd as well

4. Used DSL style of join:
   val try2 = a1.join(cdemo,LeftOuter,Some(dem.key1.attr ===
ed.key1.attr ))

5. Ran the below commad in hive for all small tables:
   ANALYZE TABLE  tableName COMPUTE STATISTICS noscan

   Please note, the application uses SQLContext and not hive context. Hence
I ran the compute statistics out of the application from hue - hive
editor. I am assuming the statistics are available in the metastore,
however, not sure
if spark can fetch these statistics since I am not using hive context
within the application.

6. Not sure if these are valid flags, but tried with them set anyways:
  sqlContext.setConf(spark.sql.planner.dimensionJoin,true)
  sqlContext.setConf(spark.sql.planner.generatedDimensionJoin,true)
  sqlContext.setConf(multiWayJoin,true)
  sqlContext.setConf(turbo, true)

7. Tried CacheTable for all small tables. This changes the query execution
to InMemoryRelation instead of ParquetTableScan, however, shuffle -
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200) remains.

8. Reduced the shuffle partition number with this parameter -
sqlContext.setConf(spark.sql.shuffle.partitions,8). But this did not
help.

With all these attempts, the small tables are still getting shuffled I
guess. Below are the queryExecutions printed on every attempt and they have
remained almost same on every attempt:

DSL Style execution plan(i.e.
rdd1.join(rdd2,LeftOuter,Some(rdd1.key.attr === rdd2.key.attr))
-
DSL Style execution plan -- HashOuterJoin [education#18],
[i1_education_cust_demo#29], LeftOuter, None
 Exchange (HashPartitioning [education#18], 200)
  ParquetTableScan [education#18,education_desc#19], (ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []
 Exchange (HashPartitioning [i1_education_cust_demo#29], 200)
  ParquetTableScan
[customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29],
(ParquetRelation
C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml,
hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), []


SQL Style execution plan (i.e sqlContext.sql(select a,b,c,d,e from t1 left
outer join t2 on t1.a = t2.a)
--
Project
[customer_id_cust_demo#20,i1_education_cust_demo#29,marital_status_cust_demo#24,hh_income_cust_demo#23,length_of_residence_cust_demo#26,ethnicity_cust_demo#25,gndr_cd_cust_demo#22,age_dt_cust_demo#21,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,education_desc#19]
 HashOuterJoin [i1_education_cust_demo#29], [education#18], LeftOuter, None
  Exchange (HashPartitioning 

Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Hi Arush, 
With your code, I still didn't see the output  Received X flumes events..



bit1...@163.com
 
From: bit1...@163.com
Date: 2015-02-17 14:08
To: Arush Kharbanda
CC: user
Subject: Re: Re: Question about spark streaming+Flume
Ok, you are missing a letter in foreachRDD.. let me proceed..



bit1...@163.com
 
From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName(SparkFlumeNGExample) 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: Received X flumes events

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Identify the performance bottleneck from hardware prospective

2015-02-16 Thread jalafate
Hi there,

I am trying to scale up the data size that my application is handling. This
application is running on a cluster with 16 slave nodes. Each slave node has
60GB memory. It is running in standalone mode. The data is coming from HDFS
that also in same local network.

In order to have an understanding on how my program is running, I also had a
Ganglia installed on the cluster. From previous run, I know the stage that
taking longest time to run is counting word pairs (my RDD consists of
sentences from a corpus). My goal is to identify the bottleneck of my
application, then modify my program or hardware configurations according to
that.

Unfortunately, I didn't find too much information on Spark monitoring and
optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
application tuning from tasks perspective. Basically, his focus is on tasks
that oddly slower than the average. However, it didn't solve my problem
because there is no such tasks that run way slow than others in my case.

So I tried to identify the bottleneck from hardware prospective. I want to
know what the limitation of the cluster is. I think if the executers are
running hard, either CPU, memory or network bandwidth (or maybe the
combinations) is hitting the roof. But Ganglia reports the CPU utilization
of cluster is no more than 50%, network utilization is high for several
seconds at the beginning, then drop close to 0. From Spark UI, I can see the
nodes with maximum memory usage is consuming around 6GB, while
spark.executor.memory is set to be 20GB. 

I am very confused that the program is not running fast enough, while
hardware resources are not in shortage. Could you please give me some hints
about what decides the performance of a Spark application from hardware
perspective?

Thanks!

Julaiti



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to retreive the value from sql.row by column name

2015-02-16 Thread Reynold Xin
BTW we merged this today: https://github.com/apache/spark/pull/4640

This should allow us in the future to address column by name in a Row.


On Mon, Feb 16, 2015 at 11:39 AM, Michael Armbrust mich...@databricks.com
wrote:

 I can unpack the code snippet a bit:

 caper.select('ran_id) is the same as saying SELECT ran_id FROM table in
 SQL.  Its always a good idea to explicitly request the columns you need
 right before using them.  That way you are tolerant of any changes to the
 schema that might happen upstream.

 The next part .map { case Row(ranId: String) = ... } is doing an
 extraction to pull out the values of the row into typed variables.  This is
 the same as doing .map(row = row(0).asInstanceOf[String]) or .map(row =
 row.getString(0)), but I find this syntax easier to read since it lines
 up nicely with the select clause that comes right before it.  It's also
 less verbose especially when pulling out a bunch of columns.

 Regarding the differences between python and java/scala, part of this is
 just due to the nature of these language.  Since java/scala are statically
 typed, you will always have to explicitly say the type of the column you
 are extracting (the bonus here is they are much faster than python due to
 optimizations this strictness allows).  However, since its already a little
 more verbose, we decided not to have the more expensive ability to look up
 columns in a row by name, and instead go with a faster ordinal based API.
 We could revisit this, but its not currently something we are planning to
 change.

 Michael

 On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell e...@ericjbell.com wrote:

  I am just learning scala so I don't actually understand what your code
 snippet is doing but thank you, I will learn more so I can figure it out.

 I am new to all of this and still trying to make the mental shift from
 normal programming to distributed programming, but it seems to me that the
 row object would know its own schema object that it came from and be able
 to ask its schema to transform a name to a column number. Am I missing
 something or is this just a matter of time constraints and this one just
 hasn't gotten into the queue yet?

 Baring that, do the schema classes provide methods for doing this? I've
 looked and didn't see anything.

 I've just discovered that the python implementation for SchemaRDD does in
 fact allow for referencing by name and column. Why is this provided in the
 python implementation but not scala or java implementations?

 Thanks,

 --eric



 On 02/16/2015 10:46 AM, Michael Armbrust wrote:

 For efficiency the row objects don't contain the schema so you can't get
 the column by name directly.  I usually do a select followed by pattern
 matching. Something like the following:

  caper.select('ran_id).map { case Row(ranId: String) = }

 On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote:

 Is it possible to reference a column from a SchemaRDD using the column's
 name instead of its number?

 For example, let's say I've created a SchemaRDD from an avro file:

 val sqlContext = new SQLContext(sc)
 import sqlContext._
 val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper)
 caper.registerTempTable(caper)

 scala caper
 res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at
 SchemaRDD.scala:108
 == Query Plan ==
 == Physical Plan ==
 PhysicalRDD
 [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD...
 scala

 Now I want to access fields, and of course the normal thing to do is to
 use a field name, not a field number.

 scala val kv = caper.map(r = (r.ran_id, r))
 console:23: error: value ran_id is not a member of
 org.apache.spark.sql.Row
val kv = caper.map(r = (r.ran_id, r))

 How do I do this?

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







PySpark and Cassandra

2015-02-16 Thread Rumph, Frens Jan
Hi,

I'm trying to connect to Cassandra through PySpark using the
spark-cassandra-connector from datastax based on the work of Mike
Sukmanowsky.

I can use Spark and Cassandra through the datastax connector in Scala just
fine. Where things fail in PySpark is that an exception is raised in
org.apache.spark.api.python.PythonRDD.writeIteratorToStream(...) with the
message 'Unexpected element
type com.datastax.spark.connector.japi.CassandraRow'.

So just to be sure: is it only possible to communicate between a Python
Spark program and the rest of the Spark ecosystem through binary or UTF-8
strings? Is there no way to communicate a richer object with at least types
like a float, etc.?

Cheers,
Frens


Re: OOM error

2015-02-16 Thread Akhil Das
Increase your executor memory, Also you can play around with increasing the
number of partitions/parallelism etc.

Thanks
Best Regards

On Tue, Feb 17, 2015 at 3:39 AM, Harshvardhan Chauhan ha...@gumgum.com
wrote:

 Hi All,


 I need some help with Out Of Memory errors in my application. I am using
 Spark 1.1.0 and my application is using Java API. I am running my app on
 EC2  25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails
 sometimes. Lots of mapToPair tasks a failing.  My app is configured to run
 120 executors and executor memory is 2G.

 These are various errors i see the in my logs.

 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 
 dropped from memory (free 257277829)
 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0x6e0138a3, 
 /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
   at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
   at 
 org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
   at 
 org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
   at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
   at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, 
 /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
   at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
   at 
 org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
   at 
 org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
   at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77)
   at 
 org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290)
   at 
 org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90)
   at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
   at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
   at 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
   at java.lang.Thread.run(Thread.java:745)
 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was 
 thrown by a user handler while handling an exception event ([id: 0xd4211985, 
 /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION: 
 java.lang.OutOfMemoryError: Java heap space)
 java.lang.OutOfMemoryError: Java heap space
   at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57)
   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
   at 
 org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649)
   at 
 org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530)
   at 
 

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Davies Liu
For the last question, you can trigger GC in JVM from Python by :

sc._jvm.System.gc()

On Mon, Feb 16, 2015 at 4:08 PM, Antony Mayi
antonym...@yahoo.com.invalid wrote:
 thanks, that looks promissing but can't find any reference giving me more
 details - can you please point me to something? Also is it possible to force
 GC from pyspark (as I am using pyspark)?

 thanks,
 Antony.


 On Monday, 16 February 2015, 21:05, Tathagata Das
 tathagata.das1...@gmail.com wrote:



 Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do
 automatic cleanup of files based on which RDDs are used/garbage collected by
 JVM. That would be the best way, but depends on the JVM GC characteristics.
 If you force a GC periodically in the driver that might help you get rid of
 files in the workers that are not needed.

 TD

 On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid
 wrote:

 spark.cleaner.ttl is not the right way - seems to be really designed for
 streaming. although it keeps the disk usage under control it also causes
 loss of rdds and broadcasts that are required later leading to crash.

 is there any other way?
 thanks,
 Antony.


 On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com
 wrote:



 spark.cleaner.ttl ?


 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com
 wrote:



 Hi,

 I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using
 about 3 billions of ratings and I am doing several trainImplicit() runs in
 loop within one spark session. I have four node cluster with 3TB disk space
 on each. before starting the job there is less then 8% of the disk space
 used. while the ALS is running I can see the disk usage rapidly growing
 mainly because of files being stored under
 yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA.
 after about 10 hours the disk usage hits 90% and yarn kills the particular
 containers.

 am I missing doing some cleanup somewhere while looping over the several
 trainImplicit() calls? taking 4*3TB of disk space seems immense.

 thanks for any help,
 Antony.









-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
Will do. Thanks a lot.


On Mon, Feb 16, 2015 at 7:20 PM, Davies Liu dav...@databricks.com wrote:

 Can you try the example in pyspark-cassandra?

 If not, you could create a issue there.

 On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  So I tired building the connector from:
  https://github.com/datastax/spark-cassandra-connector
 
  which seems to include the java class referenced in the error message:
 
  [root@devzero spark]# unzip -l
 
 spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
  |grep CassandraJavaUtil
 
  14612  02-16-2015 23:25
  com/datastax/spark/connector/japi/CassandraJavaUtil.class
 
  [root@devzero spark]#
 
 
  When I try running my spark test job, I still get the exact same error,
 even
  though both my jars seems to have been processed by spark.
 
 
  ...
  15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
  15/02/17 00:00:45 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424131245595
  15/02/17 00:00:45 INFO SparkContext: Added JAR
  file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
 
 http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
  with timestamp 1424131245623
  15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
 
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
  15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
  http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
  15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
 
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
  15/02/17 00:00:45 INFO SparkContext: Added file
  file:/spark/pyspark_cassandra.py at
  http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
  1424131245633
  15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
  localhost
  15/
  
  15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator:
 Remoting
  shut down.
  Traceback (most recent call last):
File /spark/test2.py, line 5, in module
  sc = CassandraSparkContext(conf=conf)
File /spark/python/pyspark/context.py, line 105, in __init__
  conf, jsc)
File /spark/pyspark_cassandra.py, line 17, in _do_init
  self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line
  726, in __getattr__
  py4j.protocol.Py4JError: Trying to call a package.
 
 
  am I building the wrong connector jar? or using the wrong jar?
 
  Thanks a lot,
  Mohamed.
 
 
 
  On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
 
  Oh, I don't know. thanks a lot Davies, gonna figure that out now
 
  On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It also need the Cassandra jar:
  com.datastax.spark.connector.CassandraJavaUtil
 
  Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?
 
 
 
  On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Yes, am sure the system cant find the jar.. but how do I fix that...
 my
   submit command includes the jar:
  
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
  
   and the spark output seems to indicate it is handling it:
  
   15/02/16 05:58:46 INFO SparkContext: Added JAR
   file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
   http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar
 with
   timestamp 1424066326632
  
  
   I don't really know what else I could try any suggestions highly
   appreciated.
  
   Thanks,
   Mohamed.
  
  
   On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
   wrote:
  
   It seems that the jar for cassandra is not loaded, you should have
   them in the classpath.
  
   On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
   mohamed.lrh...@georgetown.edu wrote:
Hello all,
   
Trying the example code from this package
(https://github.com/Parsely/pyspark-cassandra) , I always get
 this
error...
   
Can you see what I am doing wrong? from googling arounf it seems
 to
be
that
the jar is not found somehow...  The spark log shows the JAR was
processed
at least.
   
Thank you so much.
   
am using spark-1.2.1-bin-hadoop2.4.tgz
   
test2.py is simply:
   
from pyspark.context import SparkConf
from pyspark_cassandra import CassandraSparkContext,
 saveToCassandra
conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)

Re: Use of nscala-time within spark-shell

2015-02-16 Thread Kevin (Sangwoo) Kim
What is your scala version used to build Spark?
It seems your nscala-time library scala version is 2.11,
and default Spark scala version is 2.10.


On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote:

 Hi All,

 Thanks in advance for your help. I have timestamp which I need to convert
 to datetime using scala. A folder contains the three needed jar files:
 joda-convert-1.5.jar  joda-time-2.4.jar  nscala-time_2.11-1.8.0.jar
 Using scala REPL and adding the jars: scala -classpath *.jar
 I can use nscala-time like following:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 scala DateTime.now
 res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00

 But when i try to use spark-shell:
 ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar
 /usr/local/spark/bin/spark-shell --master local --driver-memory 2g
 --executor-memory 2g --executor-cores 1

 It successfully imports the jars:

 scala import com.github.nscala_time.time.Imports._
 import com.github.nscala_time.time.Imports._

 scala import org.joda._
 import org.joda._

 but fails using them
 scala DateTime.now
 java.lang.NoSuchMethodError:
 scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
 at
 com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69)

 at
 com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20)

 at
 com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61)

 at com.github.nscala_time.time.Imports$.init(Imports.scala:20)
 at com.github.nscala_time.time.Imports$.clinit(Imports.scala)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24)
 at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26)
 at $iwC$$iwC$$iwC$$iwC.init(console:28)
 at $iwC$$iwC$$iwC.init(console:30)
 at $iwC$$iwC.init(console:32)
 at $iwC.init(console:34)
 at init(console:36)
 at .init(console:40)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
 at
 org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)

 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
 at
 org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
 at
 org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
 at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

 Your help is very aappreciated,

 Regards,

 Hammam



Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Mohamed Lrhazi
So I tired building the connector from:
https://github.com/datastax/spark-cassandra-connector

which seems to include the java class referenced in the error message:

[root@devzero spark]# unzip -l
spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
|grep CassandraJavaUtil

14612  02-16-2015 23:25
com/datastax/spark/connector/japi/CassandraJavaUtil.class

[root@devzero spark]#


When I try running my spark test job, I still get the exact same error,
even though both my jars seems to have been processed by spark.


...
15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
timestamp 1424131245595
15/02/17 00:00:45 INFO SparkContext: Added JAR
file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
with timestamp 1424131245623
15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
/tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
15/02/17 00:00:45 INFO SparkContext: Added file
file:/spark/pyspark_cassandra.py at
http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
1424131245633
15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
localhost
15/

15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
Traceback (most recent call last):
  File /spark/test2.py, line 5, in module
sc = CassandraSparkContext(conf=conf)
  File /spark/python/pyspark/context.py, line 105, in __init__
conf, jsc)
  File /spark/pyspark_cassandra.py, line 17, in _do_init
self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
  File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.


am I building the wrong connector jar? or using the wrong jar?

Thanks a lot,
Mohamed.



On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi 
mohamed.lrh...@georgetown.edu wrote:

 Oh, I don't know. thanks a lot Davies, gonna figure that out now

 On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
 wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to
 be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master
 bash -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
 --jars
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
   /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
   ...
   15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started
   15/02/16 05:58:45 INFO Remoting: 

Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
thanks, that looks promissing but can't find any reference giving me more 
details - can you please point me to something? Also is it possible to force GC 
from pyspark (as I am using pyspark)?
thanks,Antony. 

 On Monday, 16 February 2015, 21:05, Tathagata Das 
tathagata.das1...@gmail.com wrote:
   
 

 Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do 
automatic cleanup of files based on which RDDs are used/garbage collected by 
JVM. That would be the best way, but depends on the JVM GC characteristics. If 
you force a GC periodically in the driver that might help you get rid of files 
in the workers that are not needed.
TD
On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid 
wrote:

spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 




 
   

Re: Shuffle on joining two RDDs

2015-02-16 Thread Davies Liu
This will be fixed by https://github.com/apache/spark/pull/4629

On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote:
 yeah I thought the same thing at first too, I suggested something equivalent
 w/ preservesPartitioning = true, but that isn't enough.  the join is done by
 union-ing the two transformed rdds, which is very different from the way it
 works under the hood in scala to enable narrow dependencies.  It really
 needs a bigger change to pyspark.  I filed this issue:
 https://issues.apache.org/jira/browse/SPARK-5785

 (and the somewhat related issue about documentation:
 https://issues.apache.org/jira/browse/SPARK-5786)

 partitioning should still work in pyspark, you still need some notion of
 distributing work, and the pyspark functions have a partitionFunc to decide
 that.  But, I am not an authority on pyspark, so perhaps there are more
 holes I'm not aware of ...

 Imran

 On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote:

 In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38,
 wouldn't it help to change the lines

 vs = rdd.map(lambda (k, v): (k, (1, v)))
 ws = other.map(lambda (k, v): (k, (2, v)))

 to

 vs = rdd.mapValues(lambda v: (1, v))
 ws = other.mapValues(lambda v: (2, v))

 ?
 As I understand, this would preserve the original partitioning.



 On 2015-02-13 12:43, Karlson wrote:

 Does that mean partitioning does not work in Python? Or does this only
 effect joining?

 On 2015-02-12 19:27, Davies Liu wrote:

 The feature works as expected in Scala/Java, but not implemented in
 Python.

 On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com
 wrote:

 I wonder if the issue is that these lines just need to add
 preservesPartitioning = true
 ?

 https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

 I am getting the feeling this is an issue w/ pyspark


 On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com
 wrote:


 ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.
 It
 could be that pyspark doesn't properly support narrow dependencies, or
 maybe
 you need to be more explicit about the partitioner.  I am looking into
 the
 pyspark api but you might have some better guesses here than I
 thought.

 My suggestion to do

 joinedRdd.getPartitions.foreach{println}

 was just to see if the partition was a NarrowCoGroupSplitDep or a
 ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
 fields
 are hidden deeper inside and are not user-visible.  But I think a
 better way
 (in scala, anyway) is to look at rdd.dependencies.  its a little
 tricky,
 though, you need to look deep into the lineage (example at the end).

 Sean -- yes it does require both RDDs have the same partitioner, but
 that
 should happen naturally if you just specify the same number of
 partitions,
 you'll get equal HashPartitioners.  There is a little difference in
 the
 scala  python api that I missed here.  For partitionBy in scala, you
 actually need to specify the partitioner, but not in python.  However
 I
 thought it would work like groupByKey, which does just take an int.


 Here's a code example in scala -- not sure what is available from
 python.
 Hopefully somebody knows a simpler way to confirm narrow
 dependencies??

 val d = sc.parallelize(1 to 1e6.toInt).map{x = x -
 x}.groupByKey(64)
 val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(64)
 scala d.partitioner == d2.partitioner
 res2: Boolean = true
 val joined = d.join(d2)
 val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x -
 x}.groupByKey(100)
 val badJoined = d.join(d3)

 d.setName(d)
 d2.setName(d2)
 d3.setName(d3)
 joined.setName(joined)
 badJoined.setName(badJoined)


 //unfortunatley, just looking at the immediate dependencies of joined
 
 badJoined is misleading, b/c join actually creates
 // one more step after the shuffle
 scala joined.dependencies
 res20: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@74751ac8)
 //even with the join that does require a shuffle, we still see a
 OneToOneDependency, but thats just a simple flatMap step
 scala badJoined.dependencies
 res21: Seq[org.apache.spark.Dependency[_]] =
 List(org.apache.spark.OneToOneDependency@1cf356cc)





  //so lets make a helper function to get all the dependencies
 recursively

 def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
   val deps = rdd.dependencies
   deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)}
 }


 //full dependencies of the good join

 scala flattenDeps(joined).foreach{println}
 (joined FlatMappedValuesRDD[9] at join at
 console:16,org.apache.spark.OneToOneDependency@74751ac8)
 (MappedValuesRDD[8] at join at
 console:16,org.apache.spark.OneToOneDependency@623264af)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@5a704f86)
 (CoGroupedRDD[7] at join at
 console:16,org.apache.spark.OneToOneDependency@37514cd)
 (d 

Re: Problem getting pyspark-cassandra and pyspark working

2015-02-16 Thread Davies Liu
Can you try the example in pyspark-cassandra?

If not, you could create a issue there.

On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi
mohamed.lrh...@georgetown.edu wrote:
 So I tired building the connector from:
 https://github.com/datastax/spark-cassandra-connector

 which seems to include the java class referenced in the error message:

 [root@devzero spark]# unzip -l
 spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
 |grep CassandraJavaUtil

 14612  02-16-2015 23:25
 com/datastax/spark/connector/japi/CassandraJavaUtil.class

 [root@devzero spark]#


 When I try running my spark test job, I still get the exact same error, even
 though both my jars seems to have been processed by spark.


 ...
 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040
 15/02/17 00:00:45 INFO SparkContext: Added JAR
 file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
 http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
 timestamp 1424131245595
 15/02/17 00:00:45 INFO SparkContext: Added JAR
 file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at
 http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar
 with timestamp 1424131245623
 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py
 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at
 http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624
 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to
 /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py
 15/02/17 00:00:45 INFO SparkContext: Added file
 file:/spark/pyspark_cassandra.py at
 http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp
 1424131245633
 15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host
 localhost
 15/
 
 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
 shut down.
 Traceback (most recent call last):
   File /spark/test2.py, line 5, in module
 sc = CassandraSparkContext(conf=conf)
   File /spark/python/pyspark/context.py, line 105, in __init__
 conf, jsc)
   File /spark/pyspark_cassandra.py, line 17, in _do_init
 self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc)
   File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line
 726, in __getattr__
 py4j.protocol.Py4JError: Trying to call a package.


 am I building the wrong connector jar? or using the wrong jar?

 Thanks a lot,
 Mohamed.



 On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:

 Oh, I don't know. thanks a lot Davies, gonna figure that out now

 On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote:

 It also need the Cassandra jar:
 com.datastax.spark.connector.CassandraJavaUtil

 Is it included in  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ?



 On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi
 mohamed.lrh...@georgetown.edu wrote:
  Yes, am sure the system cant find the jar.. but how do I fix that... my
  submit command includes the jar:
 
  /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path
  /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py
 
  and the spark output seems to indicate it is handling it:
 
  15/02/16 05:58:46 INFO SparkContext: Added JAR
  file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at
  http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with
  timestamp 1424066326632
 
 
  I don't really know what else I could try any suggestions highly
  appreciated.
 
  Thanks,
  Mohamed.
 
 
  On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com
  wrote:
 
  It seems that the jar for cassandra is not loaded, you should have
  them in the classpath.
 
  On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi
  mohamed.lrh...@georgetown.edu wrote:
   Hello all,
  
   Trying the example code from this package
   (https://github.com/Parsely/pyspark-cassandra) , I always get this
   error...
  
   Can you see what I am doing wrong? from googling arounf it seems to
   be
   that
   the jar is not found somehow...  The spark log shows the JAR was
   processed
   at least.
  
   Thank you so much.
  
   am using spark-1.2.1-bin-hadoop2.4.tgz
  
   test2.py is simply:
  
   from pyspark.context import SparkConf
   from pyspark_cassandra import CassandraSparkContext, saveToCassandra
   conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
   conf.set(spark.cassandra.connection.host, devzero)
   sc = CassandraSparkContext(conf=conf)
  
   [root@devzero spark]# /usr/local/bin/docker-enter  spark-master bash
   -c
   /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py
   --jars
   

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
I've managed to solve this, but I still don't know exactly why my solution
works:

In my code I was trying to force the Spark to output via:

  jsonIn.print();

jsonIn being a JavaDStreamString.

When removed the code above, and added the code below to force the output
operation, hence the execution:

jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
  @Override
  public Void call(JavaRDDString stringJavaRDD) throws Exception {
stringJavaRDD.collect();
return null;
  }
});

It works as I expect, processing all of the 20 files I give to it, instead
of stopping at 16.

--
Emre


On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works perfectly
 fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16 files
 end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory, and
 it can process all of the 16 files. That's why I call it magic number 16.

 When I mean it detects all of the files, I mean that in the logs I see the
 following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
 eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:31 INFO  

Re: hive-thriftserver maven artifact

2015-02-16 Thread Ted Yu
I searched for 'spark-hive-thriftserver_2.10' on this page:
http://mvnrepository.com/artifact/org.apache.spark

Looks like it is not published.

On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
 Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
 in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco




Re: Extract hour from Timestamp in Spark SQL

2015-02-16 Thread Wush Wu
Dear Cheng Hao,

You are right!

After using the HiveContext, the issue is solved.

Thanks,
Wush

2015-02-15 10:42 GMT+08:00 Cheng, Hao hao.ch...@intel.com:

  Are you using the SQLContext? I think the HiveContext is recommended.



 Cheng Hao



 *From:* Wush Wu [mailto:w...@bridgewell.com]
 *Sent:* Thursday, February 12, 2015 2:24 PM
 *To:* u...@spark.incubator.apache.org
 *Subject:* Extract hour from Timestamp in Spark SQL



 Dear all,

 I am new to Spark SQL and have no experience of Hive.

 I tried to use the built-in Hive Function to extract the hour from
 timestamp in spark sql, but got : java.util.NoSuchElementException: key
 not found: hour

 How should I extract the hour from timestamp?

 And I am very confusing about which functions I could use in Spark SQL. Is
 there any list of available functions except
 http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive
 ?

 Thanks,
 Wush







Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Sean,

In this case, I've been testing the code on my local machine and using
Spark locally, so I all the log output was available on my terminal. And
I've used the .print() method to have an output operation, just to force
Spark execute.

And I was not using foreachRDD, I was only using print() method on a
JavaDStream object, and it was working fine for a few files, up to 16 (and
without print() it did not do anything because there were no output
operations).

To sum it up, in my case:

 - Initially, use .print() and no foreachRDD: processes up to 16 files and
does not do anything for the remaining 4.
 - Remove .print() and use foreachRDD: processes all of the 20 files.

Maybe, as in Akhil Das's suggestion, using .count.print() might also have
fixed my problem, but I'm satisfied with foreachRDD approach for now.
(Though it is still a mystery to me why using .print() had a difference,
maybe my mental model of Spark is wrong, I thought no matter what output
operation I used, the number of files processed by Spark would be
independent of that because the processing is done in a different method,
.print() is only used to force Spark execute that processing, am I wrong?).

--
Emre


On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote:

 Materialization shouldn't be relevant. The collect by itself doesn't let
 you detect whether it happened. Print should print some results to the
 console but on different machines, so may not be a reliable way to see what
 happened.

 Yes I understand your real process uses foreachRDD and that's what you
 should use. It sounds like that works. But you must always have been using
 that right? What do you mean that you changed to use it?

 Basically I'm not clear on what the real code does and what about the
 output of that code tells you only 16 files were processed.
 On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead of
 print (see my second message in this thread). Apparently forcing Spark to
 materialize DAG via print() is not the way to go. (My interpretation might
 be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN  

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Sean Owen
How are you deciding whether files are processed or not? It doesn't seem
possible from this code. Maybe it just seems so.
On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my solution
 works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the output
 operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it, instead
 of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works perfectly
 fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16 files
 end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
 eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths 

hive-thriftserver maven artifact

2015-02-16 Thread Marco
Hi,

I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
in a public repository ? I have not found it @Maven Central.

Thanks,
Marco


Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello Sean,

I did not understand your question very well, but what I do is checking the
output directory (and I have various logger outputs at various stages
showing the contents of an input file being processed, the response from
the web service, etc.).

By the way, I've already solved my problem by using foreachRDD instead of
print (see my second message in this thread). Apparently forcing Spark to
materialize DAG via print() is not the way to go. (My interpretation might
be wrong, but this is what I've just seen in my case).

--
Emre




On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the output
 operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16 files
 end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves
 to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface
 eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
 process : 1
 2015-02-16 12:31:30 INFO  

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Sean Owen
Materialization shouldn't be relevant. The collect by itself doesn't let
you detect whether it happened. Print should print some results to the
console but on different machines, so may not be a reliable way to see what
happened.

Yes I understand your real process uses foreachRDD and that's what you
should use. It sounds like that works. But you must always have been using
that right? What do you mean that you changed to use it?

Basically I'm not clear on what the real code does and what about the
output of that code tells you only 16 files were processed.
On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead of
 print (see my second message in this thread). Apparently forcing Spark to
 materialize DAG via print() is not the way to go. (My interpretation might
 be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
 interface eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older 

Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Akhil Das
Instead of print you should do jsonIn.count().print(). Straight forward
approach is to use foreachRDD :)

Thanks
Best Regards

On Mon, Feb 16, 2015 at 6:48 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 Hello Sean,

 I did not understand your question very well, but what I do is checking
 the output directory (and I have various logger outputs at various stages
 showing the contents of an input file being processed, the response from
 the web service, etc.).

 By the way, I've already solved my problem by using foreachRDD instead of
 print (see my second message in this thread). Apparently forcing Spark to
 materialize DAG via print() is not the way to go. (My interpretation might
 be wrong, but this is what I've just seen in my case).

 --
 Emre




 On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote:

 How are you deciding whether files are processed or not? It doesn't seem
 possible from this code. Maybe it just seems so.
 On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote:

 I've managed to solve this, but I still don't know exactly why my
 solution works:

 In my code I was trying to force the Spark to output via:

   jsonIn.print();

 jsonIn being a JavaDStreamString.

 When removed the code above, and added the code below to force the
 output operation, hence the execution:

 jsonIn.foreachRDD(new FunctionJavaRDDString, Void() {
   @Override
   public Void call(JavaRDDString stringJavaRDD) throws Exception {
 stringJavaRDD.collect();
 return null;
   }
 });

 It works as I expect, processing all of the 20 files I give to it,
 instead of stopping at 16.

 --
 Emre


 On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:

 Hello,

 I have an application in Java that uses Spark Streaming 1.2.1 in the
 following manner:

  - Listen to the input directory.
  - If a new file is copied to that input directory process it.
  - Process: contact a RESTful web service (running also locally and
 responsive), send the contents of the file, receive the response from the
 web service, write the results as a new file into the output directory
  - batch interval : 30 seconds
  - checkpoint interval: 150 seconds

 When I test the application locally with 1 or 2 files, it works
 perfectly fine as expected. I run it like:

 spark-submit --class myClass --verbose --master local[4]
 --deploy-mode client myApp.jar /in file:///out

 But then I've realized something strange when I copied 20 files to the
 INPUT directory: Spark Streaming detects all of the files, but it ends up
 processing *only 16 files*. And the remaining 4 are not processed at all.

 I've tried it with 19, 18, and then 17 files. Same result, only 16
 files end up in the output directory.

 Then I've tried it by copying 16 files at once to the input directory,
 and it can process all of the 16 files. That's why I call it magic number
 16.

 When I mean it detects all of the files, I mean that in the logs I see
 the following lines when I copy 17 files:


 ===
 2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory:
 1G
 2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu
 resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on
 interface eth0)
 2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to
 bind to another address
 2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
 2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
 file:/tmp/receivedBlockMetadata
 2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Reading from the logs:
 file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
 ---
 Time: 142408626 ms
 ---

 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
 file:/tmp/receivedBlockMetadata older than 142408596:
 2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
 ReceivedBlockHandlerMaster:59 - Cleared log files in
 file:/tmp/receivedBlockMetadata older than 142408596
 2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input 

Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName(SparkFlumeNGExample) 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: Received X flumes events

I am no idea where the problem is, any idea? Thanks







Identify the performance bottleneck from hardware prospective

2015-02-16 Thread Julaiti Alafate
Hi there,

I am trying to scale up the data size that my application is handling. This
application is running on a cluster with 16 slave nodes. Each slave node
has 60GB memory. It is running in standalone mode. The data is coming from
HDFS that also in same local network.

In order to have an understanding on how my program is running, I also had
a Ganglia installed on the cluster. From previous run, I know the stage
that taking longest time to run is counting word pairs (my RDD consists of
sentences from a corpus). My goal is to identify the bottleneck of my
application, then modify my program or hardware configurations according to
that.

Unfortunately, I didn't find too much information on Spark monitoring and
optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for
application tuning from tasks perspective. Basically, his focus is on tasks
that oddly slower than the average. However, it didn't solve my problem
because there is no such tasks that run way slow than others in my case.

So I tried to identify the bottleneck from hardware prospective. I want to
know what the limitation of the cluster is. I think if the executers are
running hard, either CPU, memory or network bandwidth (or maybe the
combinations) is hitting the roof. But Ganglia reports the CPU utilization
of cluster is no more than 50%, network utilization is high for several
seconds at the beginning, then drop close to 0. From Spark UI, I can see
the nodes with maximum memory usage is consuming around 6GB, while
spark.executor.memory is set to be 20GB.

I am very confused that the program is not running fast enough, while
hardware resources are not in shortage. Could you please give me some hints
about what decides the performance of a Spark application from hardware
perspective?

Thanks!

Julaiti


Re: Question about spark streaming+Flume

2015-02-16 Thread Arush Kharbanda
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,localhost,)
// Print out the count of events received from this server in each
batch
   lines.count().map(cnt = Received  + cnt +  flume events. at  +
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote:

 Hi,
 I am trying Spark Streaming + Flume example:

 1. Code
 object SparkFlumeNGExample {
def main(args : Array[String]) {
val conf = new SparkConf().setAppName(SparkFlumeNGExample)
val ssc = new StreamingContext(conf, Seconds(10))

val lines = FlumeUtils.createStream(ssc,localhost,)
 // Print out the count of events received from this server in each
 batch
lines.count().map(cnt = Received  + cnt +  flume events. at  +
 System.currentTimeMillis() ).print()
ssc.start()
ssc.awaitTermination();
 }
 }
 2. I submit the application with following sh:
 ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master
 spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2
 --class spark.examples.streaming.SparkFlumeNGWordCount
 spark-streaming-flume.jar


 When I write data to flume, I only notice the following console
 information that input is added.
 storage.BlockManagerInfo: Added input-0-1424151807400 in memory on
 localhost:39338 (size: 1095.0 B, free: 267.2 MB)
 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time
 142415181 ms
 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
 142415182 ms
 
 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time
 142415187 ms

 But I didn't the output from the code: Received X flumes events

 I am no idea where the problem is, any idea? Thanks


 --




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Ok, you are missing a letter in foreachRDD.. let me proceed..



bit1...@163.com
 
From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName(SparkFlumeNGExample) 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: Received X flumes events

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Re: Question about spark streaming+Flume

2015-02-16 Thread bit1...@163.com
Thanks Arush..
With your code, compiling error occurs:

Error:(19, 11) value forechRDD is not a member of 
org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent]
 
lines.forechRDD(_.foreach(println)) 
^




From: Arush Kharbanda
Date: 2015-02-17 14:31
To: bit1...@163.com
CC: user
Subject: Re: Question about spark streaming+Flume
Hi

Can you try this

val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() )

lines.forechRDD(_.foreach(println))

Thanks
Arush

On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote:
Hi,
I am trying Spark Streaming + Flume example:

1. Code
object SparkFlumeNGExample { 
   def main(args : Array[String]) { 
   val conf = new SparkConf().setAppName(SparkFlumeNGExample) 
   val ssc = new StreamingContext(conf, Seconds(10)) 

   val lines = FlumeUtils.createStream(ssc,localhost,) 
// Print out the count of events received from this server in each batch 
   lines.count().map(cnt = Received  + cnt +  flume events. at  + 
System.currentTimeMillis() ).print() 
   ssc.start() 
   ssc.awaitTermination(); 
} 
}
2. I submit the application with following sh:
./spark-submit --deploy-mode client --name SparkFlumeEventCount --master 
spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 
--class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar


When I write data to flume, I only notice the following console information 
that input is added.
storage.BlockManagerInfo: Added input-0-1424151807400 in memory on 
localhost:39338 (size: 1095.0 B, free: 267.2 MB) 
15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 
142415181 ms 
15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415182 ms

15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 
142415187 ms

But I didn't the output from the code: Received X flumes events

I am no idea where the problem is, any idea? Thanks








-- 
Arush Kharbanda || Technical Teamlead
ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


java.lang.NoClassDefFoundError: org/apache/spark/SparkConf

2015-02-16 Thread siqi chen
Hello,

I have a simple Kafka Spark Streaming example which I am still developing
in the standalone mode.

Here is what is puzzling me,

If I build the assembly jar, use bin/spark-submit to run it, it works fine.
But if I want to run the code from within Intellij IDE, then it will cry
for this error

Exception in thread main java.lang.NoClassDefFoundError:
org/apache/spark/SparkConf

...
Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf

Here is my build.sbt file


import _root_.sbt.Keys._
import _root_.sbtassembly.Plugin.AssemblyKeys._
import _root_.sbtassembly.Plugin.MergeStrategy
import _root_.sbtassembly.Plugin._
import AssemblyKeys._

assemblySettings

name := test-kafka

version := 1.0

scalaVersion := 2.10.4

jarName in assembly := test-kafka-1.0.jar

assemblyOption in assembly ~= { _.copy(includeScala = false) }

libraryDependencies ++= Seq(
  org.apache.spark %% spark-core % 1.2.1 % provided,
  org.apache.spark %% spark-streaming % 1.2.1 % provided,
  (org.apache.spark %% spark-streaming-kafka % 1.2.1).
exclude(commons-beanutils, commons-beanutils).
exclude(commons-collections, commons-collections).
exclude(com.esotericsoftware.minlog, minlog).
exclude(commons-logging, commons-logging)
)

mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =
{
  case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last
  case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last
  case x if x.startsWith(plugin.properties) = MergeStrategy.last
  case x = old(x)
}
}

I also have this in my project/plugins.sbt

resolvers += Resolver.url(sbt-plugin-releases-scalasbt,
url(http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/;))

addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2)

addSbtPlugin(net.virtual-void % sbt-dependency-graph % 0.7.4)

*What is even more interesting is that if I pin the Spark jar to 1.1.1
instead of 1.2.1, then I can successfully run it within IntelliJ. *



​


Re: hive-thriftserver maven artifact

2015-02-16 Thread Arush Kharbanda
You can build your own spark with option -Phive-thriftserver.

You can publish the jars locally. I hope that would solve your problem.

On Mon, Feb 16, 2015 at 8:54 PM, Marco marco@gmail.com wrote:

 Ok, so will it be only available for the next version (1.30)?

 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 I searched for 'spark-hive-thriftserver_2.10' on this page:
 http://mvnrepository.com/artifact/org.apache.spark

 Looks like it is not published.

 On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925
 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the
 artifact in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco





 --
 Viele Grüße,
 Marco




-- 

[image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com

*Arush Kharbanda* || Technical Teamlead

ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


MLib usage on Spark Streaming

2015-02-16 Thread Spico Florin
Hello!
  I'm newbie to Spark and I have the following case study:
1. Client sending at 100ms the following data:
  {uniqueId, timestamp, measure1, measure2 }
2. Each 30 seconds I would like to correlate the data collected in the
window, with some predefined double vector pattern for each given key. The
predefined pattern has 300 records. The data should be also sorted by
timestamp.
3. When the correlation is greater than a predefined threshold (e.g 0.9) I
would like to emit an new message containing {uniqueId,
doubleCorrelationValue}
4. For the correlation I would like to use MLlib
5. As a programming language I would like to muse Java 7.

Can you please give me some suggestions on how to create the skeleton for
the above scenario?

Thanks.
 Regards,
 Florin


Re: hive-thriftserver maven artifact

2015-02-16 Thread Marco
Ok, so will it be only available for the next version (1.30)?

2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com:

 I searched for 'spark-hive-thriftserver_2.10' on this page:
 http://mvnrepository.com/artifact/org.apache.spark

 Looks like it is not published.

 On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote:

 Hi,

 I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive
 Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact
 in a public repository ? I have not found it @Maven Central.

 Thanks,
 Marco





-- 
Viele Grüße,
Marco


Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Hello,

I'm using Spark 1.2.1 and have a module.properties file, and in it I have
non-Spark properties, as well as Spark properties, e.g.:

   job.output.dir=file:///home/emre/data/mymodule/out

I'm trying to pass it to spark-submit via:

   spark-submit --class com.myModule --master local[4] --deploy-mode client
--verbose --properties-file /home/emre/data/mymodule.properties
mymodule.jar

And I thought I could read the value of my non-Spark property, namely,
job.output.dir by using:

SparkConf sparkConf = new SparkConf();
final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

But it gives me an exception:

Exception in thread main java.util.NoSuchElementException:
job.output.dir

Is it not possible to mix Spark and non-Spark properties in a single
.properties file, then pass it via --properties-file and then get the
values of those non-Spark properties via SparkConf?

Or is there another object / method to retrieve the values for those
non-Spark properties?


-- 
Emre Sevinç


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Sean Owen
Since SparkConf is only for Spark properties, I think it will in
general only pay attention to and preserve spark.* properties. You
could experiment with that. In general I wouldn't rely on Spark
mechanisms for your configuration, and you can use any config
mechanism you like to retain your own properties.

On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com wrote:
 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I have
 non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode client
 --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the values
 of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Emre Sevinc
Sean,

I'm trying this as an alternative to what I currently do. Currently I have
my module.properties file for my module in the resources directory, and
that file is put inside the über JAR file when I build my application with
Maven, and then when I submit it using spark-submit, I can read that
module.properties file via the traditional method:


properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties));

and everything works fine. The disadvantage is that in order to make any
changes to that .properties file effective, I have to re-build my
application. Therefore I'm trying to find a way to be able to send that
module.properties file via spark-submit and read the values in iy, so that
I will not be forced to build my application every time I want to make a
change in the module.properties file.

I've also checked the --files option of spark-submit, but I see that it
is for sending the listed files to executors (correct me if I'm wrong),
what I'm after is being able to pass dynamic properties (key/value pairs)
to the Driver program of my Spark application. And I still could not find
out how to do that.

--
Emre





On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote:

 Since SparkConf is only for Spark properties, I think it will in
 general only pay attention to and preserve spark.* properties. You
 could experiment with that. In general I wouldn't rely on Spark
 mechanisms for your configuration, and you can use any config
 mechanism you like to retain your own properties.

 On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com
 wrote:
  Hello,
 
  I'm using Spark 1.2.1 and have a module.properties file, and in it I have
  non-Spark properties, as well as Spark properties, e.g.:
 
 job.output.dir=file:///home/emre/data/mymodule/out
 
  I'm trying to pass it to spark-submit via:
 
 spark-submit --class com.myModule --master local[4] --deploy-mode
 client
  --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar
 
  And I thought I could read the value of my non-Spark property, namely,
  job.output.dir by using:
 
  SparkConf sparkConf = new SparkConf();
  final String validatedJSONoutputDir =
 sparkConf.get(job.output.dir);
 
  But it gives me an exception:
 
  Exception in thread main java.util.NoSuchElementException:
  job.output.dir
 
  Is it not possible to mix Spark and non-Spark properties in a single
  .properties file, then pass it via --properties-file and then get the
 values
  of those non-Spark properties via SparkConf?
 
  Or is there another object / method to retrieve the values for those
  non-Spark properties?
 
 
  --
  Emre Sevinç




-- 
Emre Sevinc


Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?

2015-02-16 Thread Charles Feduke
I haven't actually tried mixing non-Spark settings into the Spark
properties. Instead I package my properties into the jar and use the
Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala
specific) to get at my properties:

Properties file: src/main/resources/integration.conf

(below $ENV might be set to either integration or prod[3])

ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \
--conf 'config.resource=$ENV.conf' \
--conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf'

Since the properties file is packaged up with the JAR I don't have to worry
about sending the file separately to all of the slave nodes. Typesafe
Config is written in Java so it will work if you're not using Scala. (The
Typesafe Config also has the advantage of being extremely easy to integrate
with code that is using Java Properties today.)

If you instead want to send the file separately from the JAR and you use
the Typesafe Config library, you can specify config.file instead of
.resource; though I'd point you to [3] below if you want to make your
development life easier.

1. https://github.com/typesafehub/config
2. https://github.com/ceedubs/ficus
3.
http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/



On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote:

 Hello,

 I'm using Spark 1.2.1 and have a module.properties file, and in it I have
 non-Spark properties, as well as Spark properties, e.g.:

job.output.dir=file:///home/emre/data/mymodule/out

 I'm trying to pass it to spark-submit via:

spark-submit --class com.myModule --master local[4] --deploy-mode
 client --verbose --properties-file /home/emre/data/mymodule.properties
 mymodule.jar

 And I thought I could read the value of my non-Spark property, namely,
 job.output.dir by using:

 SparkConf sparkConf = new SparkConf();
 final String validatedJSONoutputDir = sparkConf.get(job.output.dir);

 But it gives me an exception:

 Exception in thread main java.util.NoSuchElementException:
 job.output.dir

 Is it not possible to mix Spark and non-Spark properties in a single
 .properties file, then pass it via --properties-file and then get the
 values of those non-Spark properties via SparkConf?

 Or is there another object / method to retrieve the values for those
 non-Spark properties?


 --
 Emre Sevinç



Re: Loading tables using parquetFile vs. loading tables from Hive metastore with Parquet serde

2015-02-16 Thread Cheng Lian
Hi Jianshi,

When accessing a Hive table with Parquet SerDe, Spark SQL tries to convert
it into Spark SQL's native Parquet support for better performance. And yes,
predicate push-down, column pruning are applied here. In 1.3.0, we'll also
cover the write path except for writing partitioned table.

Cheng

On Sun Feb 15 2015 at 9:22:15 AM Jianshi Huang jianshi.hu...@gmail.com
wrote:

 Hi,

 If I have a table in Hive metastore saved as Parquet, and I want to use it
 in Spark. It seems Spark will use Hive's Parquet serde to load the actual
 data.

 So is there any difference here? Will predicate pushdown, pruning and
 future Parquet optimizations in SparkSQL work for using Hive serde?

 Loading tables using parquetFile vs. loading tables from Hive metastore
 with Parquet serde


 Thanks,
 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github  Blog: http://huangjs.github.com/



Which OutputCommitter to use for S3?

2015-02-16 Thread Mingyu Kim
HI all,

The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to 
require moving files at the commit step, which is not a constant operation in 
S3, as discussed in 
http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E.
 People seem to develop their own NullOutputCommitter implementation or use 
DirectFileOutputCommitter (as mentioned in 
SPARK-3595https://issues.apache.org/jira/browse/SPARK-3595), but I wanted to 
check if there is a de facto standard, publicly available OutputCommitter to 
use for S3 in conjunction with Spark.

Thanks,
Mingyu


Re: spark-local dir running out of space during long ALS run

2015-02-16 Thread Antony Mayi
spark.cleaner.ttl is not the right way - seems to be really designed for 
streaming. although it keeps the disk usage under control it also causes loss 
of rdds and broadcasts that are required later leading to crash.
is there any other way?thanks,Antony. 

 On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 spark.cleaner.ttl ? 

 On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com 
wrote:
   
 

 Hi,
I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 
3 billions of ratings and I am doing several trainImplicit() runs in loop 
within one spark session. I have four node cluster with 3TB disk space on each. 
before starting the job there is less then 8% of the disk space used. while the 
ALS is running I can see the disk usage rapidly growing mainly because of files 
being stored under 
yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. 
after about 10 hours the disk usage hits 90% and yarn kills the particular 
containers.
am I missing doing some cleanup somewhere while looping over the several 
trainImplicit() calls? taking 4*3TB of disk space seems immense.
thanks for any help,Antony. 

 


 
   

Re: Array in broadcast can't be serialized

2015-02-16 Thread Tao Xiao
Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer was thrown:

val conf = new SparkConf()
   .setAppName(Test Serialization)
   .set(spark.serializer,
com.twitter.chill.WrappedArraySerializer)


Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?







2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 I was looking at https://github.com/twitter/chill

 It seems this would achieve what you want:
 chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

 Cheers

 On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
 serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
 serialized even when I registered both of them in Kryo.

 The code is as follows:

val conf = new SparkConf()
 .setAppName(Hello Spark)
 .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.kryo.registrator, xt.MyKryoRegistrator)

 val sc = new SparkContext(conf)

 val rdd = sc.parallelize(List(
 (new ImmutableBytesWritable(Bytes.toBytes(AAA)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(BBB)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(CCC)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(DDD)),
 new KeyValue())), 4)

 // snippet 1:  a single object of *ImmutableBytesWritable* can
 be serialized in broadcast
 val partitioner = new SingleElementPartitioner(sc.broadcast(new
 ImmutableBytesWritable(Bytes.toBytes(3
 val ret = rdd.aggregateByKey(List[KeyValue](),
 partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
 println(\n\n\ret.count =  + ret.count + ,  partition size = 
 + ret.partitions.size)

 // snippet 2: an array of *ImmutableBytesWritable* can not be
 serialized in broadcast
 val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
 ImmutableBytesWritable(Bytes.toBytes(2)), new
 ImmutableBytesWritable(Bytes.toBytes(3)))
 val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
 val ret1 = rdd.aggregateByKey(List[KeyValue](),
 newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
 println(\n\n\nrdd2.count =  + ret1.count)

 sc.stop


   // the following are kryo registrator and partitioners
class MyKryoRegistrator extends KryoRegistrator {
 override def registerClasses(kryo: Kryo): Unit = {
  kryo.register(classOf[ImmutableBytesWritable])   //
 register ImmutableBytesWritable
  kryo.register(classOf[Array[ImmutableBytesWritable]])
  // register Array[ImmutableBytesWritable]
 }
}

class SingleElementPartitioner(bc:
 Broadcast[ImmutableBytesWritable]) extends Partitioner {
 override def numPartitions: Int = 5
 def v = Bytes.toInt(bc.value.get)
 override def getPartition(key: Any): Int =  v - 1
}


 class ArrayPartitioner(bc:
 Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
 val arr = bc.value
 override def numPartitions: Int = arr.length
 override def getPartition(key: Any): Int =
 Bytes.toInt(arr(0).get)
 }



 In the code above, snippet 1 can work as expected. But snippet 2 throws
 Task not serializable: java.io.NotSerializableException:
 org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


 So do I have to implement a Kryo serializer for Array[T] if it is used in
 broadcast ?

 Thanks








Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)

2015-02-16 Thread Haopu Wang
I have a streaming application which registered temp table on a
HiveContext for each batch duration.

The application runs well in Spark 1.1.0. But I get below error from
1.1.1.

Do you have any suggestions to resolve it? Thank you!

 

java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf

- field (class scala.Tuple2, name: _1, type: class
java.lang.Object)

- object (class scala.Tuple2, (Configuration: core-default.xml,
core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml,
yarn-site.xml, hdfs-default.xml, hdfs-site.xml,
org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa
che.hadoop.hive.ql.session.SessionState@49b6eef9))

- field (class org.apache.spark.sql.hive.HiveContext, name: x$3,
type: class scala.Tuple2)

- object (class org.apache.spark.sql.hive.HiveContext,
org.apache.spark.sql.hive.HiveContext@4e6e66a4)

- field (class
com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2, name: sqlContext$1, type: class
org.apache.spark.sql.SQLContext)

   - object (class
com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi
sterTempTable$2, function1)

- field (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1,
name: foreachFunc$1, type: interface scala.Function1)

- object (class
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1,
function2)

- field (class org.apache.spark.streaming.dstream.ForEachDStream,
name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc,
type: interface scala.Function2)

- object (class org.apache.spark.streaming.dstream.ForEachDStream,
org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)

- element of array (index: 0)

- array (class [Ljava.lang.Object;, size: 16)

- field (class scala.collection.mutable.ArrayBuffer, name:
array, type: class [Ljava.lang.Object;)

- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20))

- field (class org.apache.spark.streaming.DStreamGraph, name:
outputStreams, type: class scala.collection.mutable.ArrayBuffer)

- custom writeObject data (class
org.apache.spark.streaming.DStreamGraph)

- object (class org.apache.spark.streaming.DStreamGraph,
org.apache.spark.streaming.DStreamGraph@776ae7da)

- field (class org.apache.spark.streaming.Checkpoint, name:
graph, type: class org.apache.spark.streaming.DStreamGraph)

- root object (class org.apache.spark.streaming.Checkpoint,
org.apache.spark.streaming.Checkpoint@5eade065)

at java.io.ObjectOutputStream.writeObject0(Unknown Source)

 



Re: Writing to HDFS from spark Streaming

2015-02-16 Thread Sean Owen
PS this is the real fix to this issue:

https://issues.apache.org/jira/browse/SPARK-5795

I'd like to merge it as I don't think it breaks the API; it actually
fixes it to work as intended.

On Mon, Feb 16, 2015 at 3:25 AM, Bahubali Jain bahub...@gmail.com wrote:
 I used the latest assembly jar and the below as suggested by Akhil to fix
 this problem...
 temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,(Class)
 TextOutputFormat.class);

 Thanks All for the help !

 On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen so...@cloudera.com wrote:

 That kinda dodges the problem by ignoring generic types. But it may be
 simpler than the 'real' solution, which is a bit ugly.

 (But first, to double check, are you importing the correct
 TextOutputFormat? there are two versions. You use .mapred. with the
 old API and .mapreduce. with the new API.)

 Here's how I've formally casted around it in similar code:

 @SuppressWarnings
 Class? extends OutputFormat?,? outputFormatClass =
 (Class? extends OutputFormat?,?) (Class?)
 TextOutputFormat.class;

 and then pass that as the final argument.

 On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  Did you try :
 
  temp.saveAsHadoopFiles(DailyCSV,.txt, String.class,
  String.class,(Class)
  TextOutputFormat.class);
 
  Thanks
  Best Regards
 
  On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain bahub...@gmail.com
  wrote:
 
  Hi,
  I am facing issues while writing data from a streaming rdd to hdfs..
 
  JavaPairDstreamString,String temp;
  ...
  ...
  temp.saveAsHadoopFiles(DailyCSV,.txt, String.class,
  String.class,TextOutputFormat.class);
 
 
  I see compilation issues as below...
  The method saveAsHadoopFiles(String, String, Class?, Class?,
  Class?
  extends OutputFormat?,?) in the type JavaPairDStreamString,String
  is
  not applicable for the arguments (String, String, ClassString,
  ClassString, ClassTextOutputFormat)
 
  I see same kind of problem even with saveAsNewAPIHadoopFiles API .
 
  Thanks,
  Baahu
 
 




 --
 Twitter:http://twitter.com/Baahu


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Magic number 16: Why doesn't Spark Streaming process more than 16 files?

2015-02-16 Thread Emre Sevinc
Hello,

I have an application in Java that uses Spark Streaming 1.2.1 in the
following manner:

 - Listen to the input directory.
 - If a new file is copied to that input directory process it.
 - Process: contact a RESTful web service (running also locally and
responsive), send the contents of the file, receive the response from the
web service, write the results as a new file into the output directory
 - batch interval : 30 seconds
 - checkpoint interval: 150 seconds

When I test the application locally with 1 or 2 files, it works perfectly
fine as expected. I run it like:

spark-submit --class myClass --verbose --master local[4]
--deploy-mode client myApp.jar /in file:///out

But then I've realized something strange when I copied 20 files to the
INPUT directory: Spark Streaming detects all of the files, but it ends up
processing *only 16 files*. And the remaining 4 are not processed at all.

I've tried it with 19, 18, and then 17 files. Same result, only 16 files
end up in the output directory.

Then I've tried it by copying 16 files at once to the input directory, and
it can process all of the 16 files. That's why I call it magic number 16.

When I mean it detects all of the files, I mean that in the logs I see the
following lines when I copy 17 files:

===
2015-02-16 12:30:51 INFO  SpotlightDriver:70 - spark.executor.memory: 1G
2015-02-16 12:30:51 WARN  Utils:71 - Your hostname, emre-ubuntu resolves to
a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2015-02-16 12:30:51 WARN  Utils:71 - Set SPARK_LOCAL_IP if you need to bind
to another address
2015-02-16 12:30:52 INFO  Slf4jLogger:80 - Slf4jLogger started
2015-02-16 12:30:52 WARN  NativeCodeLoader:62 - Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from
file:/tmp/receivedBlockMetadata
2015-02-16 12:30:53 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Reading from the logs:
file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599
file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861
---
Time: 142408626 ms
---

2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408596:
2015-02-16 12:31:00 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408596
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:30 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in
file:/tmp/receivedBlockMetadata older than 142408599:
2015-02-16 12:31:31 INFO  WriteAheadLogManager  for
ReceivedBlockHandlerMaster:59 - Cleared log files in
file:/tmp/receivedBlockMetadata older than 142408599

---

Time: 142408629 ms
---