Use of nscala-time within spark-shell
Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Your help is very aappreciated, Regards, Hammam
How to retreive the value from sql.row by column name
Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper) caper.registerTempTable(caper) scala caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala val kv = caper.map(r = (r.ran_id, r)) console:23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r = (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
We've been using commons configuration to pull our properties out of properties files and system properties (prioritizing system properties over others) and we add those properties to our spark conf explicitly and we use ArgoPartser to get the command line argument for which property file to load. We also implicitly added an extra parse args method to our SparkConf. In our main method, we do something like this: val sparkConf = SparkConfFactory.newSparkConf.parseModuleArts(args) val sparkContext = new SparkContext(sparkConf) Now all of our externally parsed properties are in the same spark conf so we can pull them off anywhere in the program that has access to an rdd/sparkcontext or the spark conf directly. On Mon, Feb 16, 2015 at 10:42 AM, Sean Owen so...@cloudera.com wrote: How about system properties? or something like Typesafe Config which lets you at least override something in a built-in config file on the command line, with props or other files. On Mon, Feb 16, 2015 at 3:38 PM, Emre Sevinc emre.sev...@gmail.com wrote: Sean, I'm trying this as an alternative to what I currently do. Currently I have my module.properties file for my module in the resources directory, and that file is put inside the über JAR file when I build my application with Maven, and then when I submit it using spark-submit, I can read that module.properties file via the traditional method: properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties)); and everything works fine. The disadvantage is that in order to make any changes to that .properties file effective, I have to re-build my application. Therefore I'm trying to find a way to be able to send that module.properties file via spark-submit and read the values in iy, so that I will not be forced to build my application every time I want to make a change in the module.properties file. I've also checked the --files option of spark-submit, but I see that it is for sending the listed files to executors (correct me if I'm wrong), what I'm after is being able to pass dynamic properties (key/value pairs) to the Driver program of my Spark application. And I still could not find out how to do that. -- Emre On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote: Since SparkConf is only for Spark properties, I think it will in general only pay attention to and preserve spark.* properties. You could experiment with that. In general I wouldn't rely on Spark mechanisms for your configuration, and you can use any config mechanism you like to retain your own properties. On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to retreive the value from sql.row by column name
I am just learning scala so I don't actually understand what your code snippet is doing but thank you, I will learn more so I can figure it out. I am new to all of this and still trying to make the mental shift from normal programming to distributed programming, but it seems to me that the row object would know its own schema object that it came from and be able to ask its schema to transform a name to a column number. Am I missing something or is this just a matter of time constraints and this one just hasn't gotten into the queue yet? Baring that, do the schema classes provide methods for doing this? I've looked and didn't see anything. I've just discovered that the python implementation for SchemaRDD does in fact allow for referencing by name and column. Why is this provided in the python implementation but not scala or java implementations? Thanks, --eric On 02/16/2015 10:46 AM, Michael Armbrust wrote: For efficiency the row objects don't contain the schema so you can't get the column by name directly. I usually do a select followed by pattern matching. Something like the following: caper.select('ran_id).map { case Row(ranId: String) = } On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com mailto:e...@ericjbell.com wrote: Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper) caper.registerTempTable(caper) scala caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala val kv = caper.map(r = (r.ran_id, r)) console:23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r = (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Array in broadcast can't be serialized
Is it possible to port WrappedArraySerializer.scala to your app ? Pardon me for not knowing how to integrate Chill with Spark. Cheers On Mon, Feb 16, 2015 at 12:31 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Thanks Ted After searching for a whole day, I still don't know how to let spark use twitter chill serialization - there are very few documents about how to integrate twitter chill into Spark for serialization. I tried the following, but an exception of java.lang.ClassCastException: com.twitter.chill.WrappedArraySerializer cannot be cast to org.apache.spark.serializer.Serializer was thrown: val conf = new SparkConf() .setAppName(Test Serialization) .set(spark.serializer, com.twitter.chill.WrappedArraySerializer) Well, what is the correct way of configuring Spark to use the twitter chill serialization framework ? 2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com: I was looking at https://github.com/twitter/chill It seems this would achieve what you want: chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala Cheers On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Spark newbie desires feedback on first program
I'm a spark newbie working on his first attempt to do write an ETL program. I could use some feedback to make sure I'm on the right path. I've written a basic proof of concept that runs without errors and seems to work, although I might be missing some issues when this is actually run on more than a single node. I am working with data about people (actually healthcare patients). I have an RDD that contains multiple rows per person. My overall goal is to create a single Person object for each person in my data. In this example, I am serializing to JSON, mostly because this is what I know how to do at the moment. Other than general feedback, is my use of the groupByKey() and mapValues() methods appropriate? Thanks! import json class Person: def __init__(self): self.mydata={} self.cpts = [] self.mydata['cpt']=self.cpts def addRowData(self, dataRow): # Get the CPT codes cpt = dataRow.CPT_1 if cpt: self.cpts.append(cpt) def serializeToJSON(self): return json.dumps(self.mydata) def makeAPerson(rows): person = Person() for row in rows: person.addRowData(row) return person.serializeToJSON() peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: makeAPerson(personDataRows)) peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Problem getting pyspark-cassandra and pyspark working
Hello all, Trying the example code from this package ( https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting: Starting remoting 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@devzero:38917] 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on port 38917. 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file server' on port 56642. 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp 1424066326642 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost, 32895) 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped 15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped 15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /spark/test2.py, line 5, in module sc = CassandraSparkContext(conf=conf) File /spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /spark/pyspark_cassandra.py, line 17, in _do_init self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc) File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 726, in __getattr__ py4j.protocol.Py4JError: Trying to call a package.
Re: Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)
You probably want to mark the HiveContext as @transient as its not valid to use it on the slaves anyway. On Mon, Feb 16, 2015 at 1:58 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a streaming application which registered temp table on a HiveContext for each batch duration. The application runs well in Spark 1.1.0. But I get below error from 1.1.1. Do you have any suggestions to resolve it? Thank you! *java.io.NotSerializableException*: org.apache.hadoop.hive.conf.HiveConf - field (class scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23 ,org.apache.hadoop.hive.ql.session.SessionState@49b6eef9)) - field (class org.apache.spark.sql.hive.HiveContext, name: x$3, type: class scala.Tuple2) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4e6e66a4) - field (class com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2, name: sqlContext$1, type: class org.apache.spark.sql.SQLContext) - object (class com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$registerTempTable$2, function1) - field (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1, name: foreachFunc$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1, function2) - field (class org.apache.spark.streaming.dstream.ForEachDStream, name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20) - element of array (index: 0) - array (class [Ljava.lang.Object;, size: 16) - field (class scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)) - field (class org.apache.spark.streaming.DStreamGraph, name: outputStreams, type: class scala.collection.mutable.ArrayBuffer) - custom writeObject data (class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@776ae7da) - field (class org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - root object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@5eade065) at java.io.ObjectOutputStream.writeObject0(Unknown Source)
Re: spark-local dir running out of space during long ALS run
Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get rid of files in the workers that are not needed. TD On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way? thanks, Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help, Antony.
Re: How to retreive the value from sql.row by column name
For efficiency the row objects don't contain the schema so you can't get the column by name directly. I usually do a select followed by pattern matching. Something like the following: caper.select('ran_id).map { case Row(ranId: String) = } On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote: Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper) caper.registerTempTable(caper) scala caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4, APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10# 11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17, CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23, CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28, CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33, CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11# 38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42, CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46, CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50, CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54, CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala val kv = caper.map(r = (r.ran_id, r)) console:23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r = (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to retreive the value from sql.row by column name
I can unpack the code snippet a bit: caper.select('ran_id) is the same as saying SELECT ran_id FROM table in SQL. Its always a good idea to explicitly request the columns you need right before using them. That way you are tolerant of any changes to the schema that might happen upstream. The next part .map { case Row(ranId: String) = ... } is doing an extraction to pull out the values of the row into typed variables. This is the same as doing .map(row = row(0).asInstanceOf[String]) or .map(row = row.getString(0)), but I find this syntax easier to read since it lines up nicely with the select clause that comes right before it. It's also less verbose especially when pulling out a bunch of columns. Regarding the differences between python and java/scala, part of this is just due to the nature of these language. Since java/scala are statically typed, you will always have to explicitly say the type of the column you are extracting (the bonus here is they are much faster than python due to optimizations this strictness allows). However, since its already a little more verbose, we decided not to have the more expensive ability to look up columns in a row by name, and instead go with a faster ordinal based API. We could revisit this, but its not currently something we are planning to change. Michael On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell e...@ericjbell.com wrote: I am just learning scala so I don't actually understand what your code snippet is doing but thank you, I will learn more so I can figure it out. I am new to all of this and still trying to make the mental shift from normal programming to distributed programming, but it seems to me that the row object would know its own schema object that it came from and be able to ask its schema to transform a name to a column number. Am I missing something or is this just a matter of time constraints and this one just hasn't gotten into the queue yet? Baring that, do the schema classes provide methods for doing this? I've looked and didn't see anything. I've just discovered that the python implementation for SchemaRDD does in fact allow for referencing by name and column. Why is this provided in the python implementation but not scala or java implementations? Thanks, --eric On 02/16/2015 10:46 AM, Michael Armbrust wrote: For efficiency the row objects don't contain the schema so you can't get the column by name directly. I usually do a select followed by pattern matching. Something like the following: caper.select('ran_id).map { case Row(ranId: String) = } On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote: Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper) caper.registerTempTable(caper) scala caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala val kv = caper.map(r = (r.ran_id, r)) console:23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r = (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem getting pyspark-cassandra and pyspark working
Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting: Starting remoting 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@devzero:38917] 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on port 38917. 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file server' on port 56642. 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp 1424066326642 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost, 32895) 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped
Re: Spark newbie desires feedback on first program
I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the localhost:9000 from your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS configuration for Spark supply the missing pieces. On Mon Feb 16 2015 at 3:38:31 PM Eric Bell e...@ericjbell.com wrote: I'm a spark newbie working on his first attempt to do write an ETL program. I could use some feedback to make sure I'm on the right path. I've written a basic proof of concept that runs without errors and seems to work, although I might be missing some issues when this is actually run on more than a single node. I am working with data about people (actually healthcare patients). I have an RDD that contains multiple rows per person. My overall goal is to create a single Person object for each person in my data. In this example, I am serializing to JSON, mostly because this is what I know how to do at the moment. Other than general feedback, is my use of the groupByKey() and mapValues() methods appropriate? Thanks! import json class Person: def __init__(self): self.mydata={} self.cpts = [] self.mydata['cpt']=self.cpts def addRowData(self, dataRow): # Get the CPT codes cpt = dataRow.CPT_1 if cpt: self.cpts.append(cpt) def serializeToJSON(self): return json.dumps(self.mydata) def makeAPerson(rows): person = Person() for row in rows: person.addRowData(row) return person.serializeToJSON() peopleRDD = caper_kv.groupByKey().mapValues(lambda personDataRows: makeAPerson(personDataRows)) peopleRDD.saveAsTextFile(hdfs://localhost:9000/sma/processJSON/people) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark newbie desires feedback on first program
My first problem was somewhat similar to yours. You won't find a whole lot of JDBC to Spark examples since I think a lot of the adoption for Spark is from teams already experienced with Hadoop and already have an established big data solution (so their data is already extracted from whatever sources, e.g., log files, Hive, other M/R jobs). JDBC support is somewhat... lacking. Our application uses a 12 node PostgreSQL distributed RDBMS that is sharded at the application tier. I had to write my own JDBC RDD to support this logical schema. However because you are coming from a single MySQL DB you should be able to get away with using the JdbcRDD[1]... but I cannot find a reference to it for the Python API so someone familiar with using Python and Spark will have to chime in on that. You need to consider _how_ the data gets from MySQL to the workers. It might work to pull all of the data to a single node and then parallelize that data across the cluster but its not going to be as efficient as range querying from each worker in the cluster to the database. If you're working with TBs of data then you will see very big benefits by distributing the data across workers from the get go; if you don't it will take however long it takes to copy all the data to a single worker and distribute as your startup code for each execution. (By range querying what I mean is basically what the JdbcRDD does - it forces you to include a conditional statement like id ? AND id = ? in your SQL which it formats at each worker so each worker only gets a piece of the pie). The JdbcRDD makes assumptions about numeric keys for range querying. The next thing to consider is if you're going against your production database, will massive reads cause degradation for production users? I am using read replicas to mitigate this for our production installation, as copying TBs of data out of PostgreSQL would have some negative effect on our users. Running your jobs during low traffic is obviously an option here, as is restoring a read-only version from backup and explicitly querying that instance (in which case parallelizing user IDs and querying MySQL directly might get you near to the JdbcRDD behavior). And of course if the MySQL instance is already your analytics solution then query on. 1. https://spark.apache.org/docs/1.1.0/api/java/org/apache/spark/rdd/JdbcRDD.html On Mon Feb 16 2015 at 4:42:30 PM Eric Bell e...@ericjbell.com wrote: Thanks Charles. I just realized a few minutes ago that I neglected to show the step where I generated the key on the person ID. Thanks for the pointer on the HDFS URL. Next step is to process data from multiple RDDS. My data originates from 7 tables in a MySQL database. I used sqoop to create avro files from these tables, and in turn created RDDs using SparkSQL from the avro files. Since the groupByKey only operates on a single RDD, I'm not quite sure yet how I'm going to process 7 tables as a transformation to get all the data I need into my objects. I'm vascillating on whether I should be doing it this way, or if it would be a lot simpler to query MySQL to get all the Person IDs, parallelize them, and have my Person class make queries directly to the MySQL database. Since in theory I only have to do this once, I'm not sure there's much to be gained in moving the data from MySQL to Spark first. I have yet to find any non-trivial examples of ETL logic on the web ... it seems like it's mostly word count map-reduce replacements. On 02/16/2015 01:32 PM, Charles Feduke wrote: I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the localhost:9000 from your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS configuration for Spark supply the missing pieces.
Re: Problem getting pyspark-cassandra and pyspark working
It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil Is it included in /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ? On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting: Starting remoting 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@devzero:38917] 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on port 38917. 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file server' on port 56642. 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp 1424066326642 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost, 32895) 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040 15/02/16
Re: Spark newbie desires feedback on first program
Thanks Charles. I just realized a few minutes ago that I neglected to show the step where I generated the key on the person ID. Thanks for the pointer on the HDFS URL. Next step is to process data from multiple RDDS. My data originates from 7 tables in a MySQL database. I used sqoop to create avro files from these tables, and in turn created RDDs using SparkSQL from the avro files. Since the groupByKey only operates on a single RDD, I'm not quite sure yet how I'm going to process 7 tables as a transformation to get all the data I need into my objects. I'm vascillating on whether I should be doing it this way, or if it would be a lot simpler to query MySQL to get all the Person IDs, parallelize them, and have my Person class make queries directly to the MySQL database. Since in theory I only have to do this once, I'm not sure there's much to be gained in moving the data from MySQL to Spark first. I have yet to find any non-trivial examples of ETL logic on the web ... it seems like it's mostly word count map-reduce replacements. On 02/16/2015 01:32 PM, Charles Feduke wrote: I cannot comment about the correctness of Python code. I will assume your caper_kv is keyed on something that uniquely identifies all the rows that make up the person's record so your group by key makes sense, as does the map. (I will also assume all of the rows that comprise a single person's record will always fit in memory. If not you will need another approach.) You should be able to get away with removing the localhost:9000 from your HDFS URL, i.e., hdfs:///sma/processJSON/people and let your HDFS configuration for Spark supply the missing pieces. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem getting pyspark-cassandra and pyspark working
Oh, I don't know. thanks a lot Davies, gonna figure that out now On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote: It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil Is it included in /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ? On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting: Starting remoting 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@devzero:38917] 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on port 38917. 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file server' on port 56642. 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp 1424066326642 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager 15/02/16 05:58:46 INFO
OOM error
Hi All, I need some help with Out Of Memory errors in my application. I am using Spark 1.1.0 and my application is using Java API. I am running my app on EC2 25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails sometimes. Lots of mapToPair tasks a failing. My app is configured to run 120 executors and executor memory is 2G. These are various errors i see the in my logs. 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 dropped from memory (free 257277829) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x6e0138a3, /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xd4211985, /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at
Re: Problem getting pyspark-cassandra and pyspark working
It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting: Starting remoting 15/02/16 05:58:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@devzero:38917] 15/02/16 05:58:45 INFO Utils: Successfully started service 'sparkDriver' on port 38917. 15/02/16 05:58:45 INFO SparkEnv: Registering MapOutputTracker 15/02/16 05:58:45 INFO SparkEnv: Registering BlockManagerMaster 15/02/16 05:58:45 INFO DiskBlockManager: Created local directory at /tmp/spark-6cdca68b-edec-4a31-b3c1-a7e9d60191e7/spark-0e977468-6e31-4bba-959a-135d9ebda193 15/02/16 05:58:45 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/02/16 05:58:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/02/16 05:58:46 INFO HttpFileServer: HTTP File server directory is /tmp/spark-af61f7f5-7c0e-412c-8352-263338335fa5/spark-10b3891f-0321-44fe-ba60-1a8c102fd647 15/02/16 05:58:46 INFO HttpServer: Starting HTTP Server 15/02/16 05:58:46 INFO Utils: Successfully started service 'HTTP file server' on port 56642. 15/02/16 05:58:46 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/02/16 05:58:46 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 15/02/16 05:58:46 INFO Utils: Copying /spark/test2.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/test2.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:56642/files/test2.py with timestamp 1424066326633 15/02/16 05:58:46 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-e8cc013e-faae-4208-8bcd-0bb6c00b1b6c/spark-54f2c41d-ae35-4efd-860c-2e5c60979b4c/pyspark_cassandra.py 15/02/16 05:58:46 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:56642/files/pyspark_cassandra.py with timestamp 1424066326642 15/02/16 05:58:46 INFO Executor: Starting executor ID driver on host localhost 15/02/16 05:58:46 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@devzero:38917/user/HeartbeatReceiver 15/02/16 05:58:46 INFO NettyBlockTransferService: Server created on 32895 15/02/16 05:58:46 INFO BlockManagerMaster: Trying to register BlockManager 15/02/16 05:58:46 INFO BlockManagerMasterActor: Registering block manager localhost:32895 with 265.4 MB RAM, BlockManagerId(driver, localhost, 32895) 15/02/16 05:58:46 INFO BlockManagerMaster: Registered BlockManager 15/02/16 05:58:47 INFO SparkUI: Stopped Spark web UI at http://devzero:4040 15/02/16 05:58:47 INFO DAGScheduler: Stopping DAGScheduler 15/02/16 05:58:48 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 15/02/16 05:58:48 INFO MemoryStore: MemoryStore cleared 15/02/16 05:58:48 INFO BlockManager: BlockManager stopped 15/02/16 05:58:48 INFO BlockManagerMaster: BlockManagerMaster stopped 15/02/16 05:58:48 INFO SparkContext: Successfully stopped SparkContext 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/16 05:58:48 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /spark/test2.py, line 5, in module sc = CassandraSparkContext(conf=conf) File /spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /spark/pyspark_cassandra.py, line 17, in _do_init self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc) File
Unable to broadcast dimension tables with Spark SQL
Hi Experts, I have a large table with 54 million records (fact table), being joined with 6 small tables (dimension tables). The size on disk of small tables is within 5k and the record count is in the range of 4 - 200 All the worker nodes have RAM of 32GB allocated for spark. I have tried the below approaches and looks like the small tables are not being broadcast, which is causing timeouts as expected and failure of the job. The reason for this, AFAIK is, the small table is also getting shuffled and is fitting into a single node's partition. Then the large table is made to flow to the same node which stays busy while all other nodes are idle. Note: The spark version in use on cluster as well as my local setup is 1.1.0. I also tried with Spark 1.2.0 in the local setup, however the queryPlan showed no change. 1. Broadcast the RDD before registering as table: val k = sqlContext.parquetFile(p.fileName) val t = sc.broadcast(k) t.value.registerTempTable(p.tableName) 2. Set the variable sqlContext.setConf(spark.sql.autoBroadcastJoinThreshold,1) 3. Added limit to each small table before registering as table. I guess this gives optimizer a way compute statistics and determine that the other table is small enough for broadcast: sqlContext.sql(select * from a_nolim limit 7).registerTempTable(edu) also tried DSL style: a.limit(7).registerTempTable(edu) Tried explicit broadcasting of the tables as below: sc.broadcast(sqlContext.sql(select * from edu_nolim limit 7)).value.registerTempTable(edu) and tried dsl style with broadcast done on the rdd as well 4. Used DSL style of join: val try2 = a1.join(cdemo,LeftOuter,Some(dem.key1.attr === ed.key1.attr )) 5. Ran the below commad in hive for all small tables: ANALYZE TABLE tableName COMPUTE STATISTICS noscan Please note, the application uses SQLContext and not hive context. Hence I ran the compute statistics out of the application from hue - hive editor. I am assuming the statistics are available in the metastore, however, not sure if spark can fetch these statistics since I am not using hive context within the application. 6. Not sure if these are valid flags, but tried with them set anyways: sqlContext.setConf(spark.sql.planner.dimensionJoin,true) sqlContext.setConf(spark.sql.planner.generatedDimensionJoin,true) sqlContext.setConf(multiWayJoin,true) sqlContext.setConf(turbo, true) 7. Tried CacheTable for all small tables. This changes the query execution to InMemoryRelation instead of ParquetTableScan, however, shuffle - Exchange (HashPartitioning [i1_education_cust_demo#29], 200) remains. 8. Reduced the shuffle partition number with this parameter - sqlContext.setConf(spark.sql.shuffle.partitions,8). But this did not help. With all these attempts, the small tables are still getting shuffled I guess. Below are the queryExecutions printed on every attempt and they have remained almost same on every attempt: DSL Style execution plan(i.e. rdd1.join(rdd2,LeftOuter,Some(rdd1.key.attr === rdd2.key.attr)) - DSL Style execution plan -- HashOuterJoin [education#18], [i1_education_cust_demo#29], LeftOuter, None Exchange (HashPartitioning [education#18], 200) ParquetTableScan [education#18,education_desc#19], (ParquetRelation C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_education, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), [] Exchange (HashPartitioning [i1_education_cust_demo#29], 200) ParquetTableScan [customer_id_cust_demo#20,age_dt_cust_demo#21,gndr_cd_cust_demo#22,hh_income_cust_demo#23,marital_status_cust_demo#24,ethnicity_cust_demo#25,length_of_residence_cust_demo#26,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,i1_education_cust_demo#29], (ParquetRelation C:/Sunita/eclipse/workspace/branch/trial/plsresources/plsbuyer/cg_pq_cdw_cust_demo_dm_sample, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@3bd36d4c, []), [] SQL Style execution plan (i.e sqlContext.sql(select a,b,c,d,e from t1 left outer join t2 on t1.a = t2.a) -- Project [customer_id_cust_demo#20,i1_education_cust_demo#29,marital_status_cust_demo#24,hh_income_cust_demo#23,length_of_residence_cust_demo#26,ethnicity_cust_demo#25,gndr_cd_cust_demo#22,age_dt_cust_demo#21,presence_of_young_adult_cust_demo#27,aged_parent_in_hh_cust_demo#28,education_desc#19] HashOuterJoin [i1_education_cust_demo#29], [education#18], LeftOuter, None Exchange (HashPartitioning
Re: Re: Question about spark streaming+Flume
Hi Arush, With your code, I still didn't see the output Received X flumes events.. bit1...@163.com From: bit1...@163.com Date: 2015-02-17 14:08 To: Arush Kharbanda CC: user Subject: Re: Re: Question about spark streaming+Flume Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Identify the performance bottleneck from hardware prospective
Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Identify-the-performance-bottleneck-from-hardware-prospective-tp21684.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to retreive the value from sql.row by column name
BTW we merged this today: https://github.com/apache/spark/pull/4640 This should allow us in the future to address column by name in a Row. On Mon, Feb 16, 2015 at 11:39 AM, Michael Armbrust mich...@databricks.com wrote: I can unpack the code snippet a bit: caper.select('ran_id) is the same as saying SELECT ran_id FROM table in SQL. Its always a good idea to explicitly request the columns you need right before using them. That way you are tolerant of any changes to the schema that might happen upstream. The next part .map { case Row(ranId: String) = ... } is doing an extraction to pull out the values of the row into typed variables. This is the same as doing .map(row = row(0).asInstanceOf[String]) or .map(row = row.getString(0)), but I find this syntax easier to read since it lines up nicely with the select clause that comes right before it. It's also less verbose especially when pulling out a bunch of columns. Regarding the differences between python and java/scala, part of this is just due to the nature of these language. Since java/scala are statically typed, you will always have to explicitly say the type of the column you are extracting (the bonus here is they are much faster than python due to optimizations this strictness allows). However, since its already a little more verbose, we decided not to have the more expensive ability to look up columns in a row by name, and instead go with a faster ordinal based API. We could revisit this, but its not currently something we are planning to change. Michael On Mon, Feb 16, 2015 at 11:04 AM, Eric Bell e...@ericjbell.com wrote: I am just learning scala so I don't actually understand what your code snippet is doing but thank you, I will learn more so I can figure it out. I am new to all of this and still trying to make the mental shift from normal programming to distributed programming, but it seems to me that the row object would know its own schema object that it came from and be able to ask its schema to transform a name to a column number. Am I missing something or is this just a matter of time constraints and this one just hasn't gotten into the queue yet? Baring that, do the schema classes provide methods for doing this? I've looked and didn't see anything. I've just discovered that the python implementation for SchemaRDD does in fact allow for referencing by name and column. Why is this provided in the python implementation but not scala or java implementations? Thanks, --eric On 02/16/2015 10:46 AM, Michael Armbrust wrote: For efficiency the row objects don't contain the schema so you can't get the column by name directly. I usually do a select followed by pattern matching. Something like the following: caper.select('ran_id).map { case Row(ranId: String) = } On Mon, Feb 16, 2015 at 8:54 AM, Eric Bell e...@ericjbell.com wrote: Is it possible to reference a column from a SchemaRDD using the column's name instead of its number? For example, let's say I've created a SchemaRDD from an avro file: val sqlContext = new SQLContext(sc) import sqlContext._ val caper=sqlContext.avroFile(hdfs://localhost:9000/sma/raw_avro/caper) caper.registerTempTable(caper) scala caper res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [ADMDISP#0,age#1,AMBSURG#2,apptdt_skew#3,APPTSTAT#4,APPTTYPE#5,ASSGNDUR#6,CANCSTAT#7,CAPERSTAT#8,COMPLAINT#9,CPT_1#10,CPT_10#11,CPT_11#12,CPT_12#13,CPT_13#14,CPT_2#15,CPT_3#16,CPT_4#17,CPT_5#18,CPT_6#19,CPT_7#20,CPT_8#21,CPT_9#22,CPTDX_1#23,CPTDX_10#24,CPTDX_11#25,CPTDX_12#26,CPTDX_13#27,CPTDX_2#28,CPTDX_3#29,CPTDX_4#30,CPTDX_5#31,CPTDX_6#32,CPTDX_7#33,CPTDX_8#34,CPTDX_9#35,CPTMOD1_1#36,CPTMOD1_10#37,CPTMOD1_11#38,CPTMOD1_12#39,CPTMOD1_13#40,CPTMOD1_2#41,CPTMOD1_3#42,CPTMOD1_4#43,CPTMOD1_5#44,CPTMOD1_6#45,CPTMOD1_7#46,CPTMOD1_8#47,CPTMOD1_9#48,CPTMOD2_1#49,CPTMOD2_10#50,CPTMOD2_11#51,CPTMOD2_12#52,CPTMOD2_13#53,CPTMOD2_2#54,CPTMOD2_3#55,CPTMOD2_4#56,CPTMOD... scala Now I want to access fields, and of course the normal thing to do is to use a field name, not a field number. scala val kv = caper.map(r = (r.ran_id, r)) console:23: error: value ran_id is not a member of org.apache.spark.sql.Row val kv = caper.map(r = (r.ran_id, r)) How do I do this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark and Cassandra
Hi, I'm trying to connect to Cassandra through PySpark using the spark-cassandra-connector from datastax based on the work of Mike Sukmanowsky. I can use Spark and Cassandra through the datastax connector in Scala just fine. Where things fail in PySpark is that an exception is raised in org.apache.spark.api.python.PythonRDD.writeIteratorToStream(...) with the message 'Unexpected element type com.datastax.spark.connector.japi.CassandraRow'. So just to be sure: is it only possible to communicate between a Python Spark program and the rest of the Spark ecosystem through binary or UTF-8 strings? Is there no way to communicate a richer object with at least types like a float, etc.? Cheers, Frens
Re: OOM error
Increase your executor memory, Also you can play around with increasing the number of partitions/parallelism etc. Thanks Best Regards On Tue, Feb 17, 2015 at 3:39 AM, Harshvardhan Chauhan ha...@gumgum.com wrote: Hi All, I need some help with Out Of Memory errors in my application. I am using Spark 1.1.0 and my application is using Java API. I am running my app on EC2 25 m3.xlarge (4 Cores 15GB Memory) instances. The app only fails sometimes. Lots of mapToPair tasks a failing. My app is configured to run 120 executors and executor memory is 2G. These are various errors i see the in my logs. 15/02/16 10:53:48 INFO storage.MemoryStore: Block broadcast_1 of size 4680 dropped from memory (free 257277829) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x6e0138a3, /10.61.192.194:35196 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:49 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0x2d0c1db1, /10.169.226.254:55790 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:77) at org.jboss.netty.channel.socket.nio.SocketSendBufferPool.acquire(SocketSendBufferPool.java:46) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.write0(AbstractNioWorker.java:194) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromTaskLoop(AbstractNioWorker.java:152) at org.jboss.netty.channel.socket.nio.AbstractNioChannel$WriteTask.run(AbstractNioChannel.java:335) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:366) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:290) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:90) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/16 10:53:50 WARN channel.DefaultChannelPipeline: An exception was thrown by a user handler while handling an exception event ([id: 0xd4211985, /10.181.125.52:60959 = /10.164.164.228:49445] EXCEPTION: java.lang.OutOfMemoryError: Java heap space) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at org.jboss.netty.buffer.CompositeChannelBuffer.toByteBuffer(CompositeChannelBuffer.java:649) at org.jboss.netty.buffer.AbstractChannelBuffer.toByteBuffer(AbstractChannelBuffer.java:530) at
Re: spark-local dir running out of space during long ALS run
For the last question, you can trigger GC in JVM from Python by : sc._jvm.System.gc() On Mon, Feb 16, 2015 at 4:08 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: thanks, that looks promissing but can't find any reference giving me more details - can you please point me to something? Also is it possible to force GC from pyspark (as I am using pyspark)? thanks, Antony. On Monday, 16 February 2015, 21:05, Tathagata Das tathagata.das1...@gmail.com wrote: Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get rid of files in the workers that are not needed. TD On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way? thanks, Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help, Antony. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem getting pyspark-cassandra and pyspark working
Will do. Thanks a lot. On Mon, Feb 16, 2015 at 7:20 PM, Davies Liu dav...@databricks.com wrote: Can you try the example in pyspark-cassandra? If not, you could create a issue there. On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: So I tired building the connector from: https://github.com/datastax/spark-cassandra-connector which seems to include the java class referenced in the error message: [root@devzero spark]# unzip -l spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar |grep CassandraJavaUtil 14612 02-16-2015 23:25 com/datastax/spark/connector/japi/CassandraJavaUtil.class [root@devzero spark]# When I try running my spark test job, I still get the exact same error, even though both my jars seems to have been processed by spark. ... 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424131245595 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar with timestamp 1424131245623 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp 1424131245633 15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host localhost 15/ 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /spark/test2.py, line 5, in module sc = CassandraSparkContext(conf=conf) File /spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /spark/pyspark_cassandra.py, line 17, in _do_init self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc) File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 726, in __getattr__ py4j.protocol.Py4JError: Trying to call a package. am I building the wrong connector jar? or using the wrong jar? Thanks a lot, Mohamed. On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Oh, I don't know. thanks a lot Davies, gonna figure that out now On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote: It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil Is it included in /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ? On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver)
Re: Use of nscala-time within spark-shell
What is your scala version used to build Spark? It seems your nscala-time library scala version is 2.11, and default Spark scala version is 2.10. On Tue Feb 17 2015 at 1:51:47 AM Hammam CHAMSI hscha...@hotmail.com wrote: Hi All, Thanks in advance for your help. I have timestamp which I need to convert to datetime using scala. A folder contains the three needed jar files: joda-convert-1.5.jar joda-time-2.4.jar nscala-time_2.11-1.8.0.jar Using scala REPL and adding the jars: scala -classpath *.jar I can use nscala-time like following: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ scala DateTime.now res0: org.joda.time.DateTime = 2015-02-12T15:51:46.928+01:00 But when i try to use spark-shell: ADD_JARS=/home/scala_test_class/nscala-time_2.11-1.8.0.jar,/home/scala_test_class/joda-time-2.4.jar,/home/scala_test_class/joda-convert-1.5.jar /usr/local/spark/bin/spark-shell --master local --driver-memory 2g --executor-memory 2g --executor-cores 1 It successfully imports the jars: scala import com.github.nscala_time.time.Imports._ import com.github.nscala_time.time.Imports._ scala import org.joda._ import org.joda._ but fails using them scala DateTime.now java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; at com.github.nscala_time.time.LowPriorityOrderingImplicits$class.ReadableInstantOrdering(Implicits.scala:69) at com.github.nscala_time.time.Imports$.ReadableInstantOrdering(Imports.scala:20) at com.github.nscala_time.time.OrderingImplicits$class.$init$(Implicits.scala:61) at com.github.nscala_time.time.Imports$.init(Imports.scala:20) at com.github.nscala_time.time.Imports$.clinit(Imports.scala) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:17) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:22) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:24) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:26) at $iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC.init(console:32) at $iwC.init(console:34) at init(console:36) at .init(console:40) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Your help is very aappreciated, Regards, Hammam
Re: Problem getting pyspark-cassandra and pyspark working
So I tired building the connector from: https://github.com/datastax/spark-cassandra-connector which seems to include the java class referenced in the error message: [root@devzero spark]# unzip -l spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar |grep CassandraJavaUtil 14612 02-16-2015 23:25 com/datastax/spark/connector/japi/CassandraJavaUtil.class [root@devzero spark]# When I try running my spark test job, I still get the exact same error, even though both my jars seems to have been processed by spark. ... 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424131245595 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar with timestamp 1424131245623 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp 1424131245633 15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host localhost 15/ 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /spark/test2.py, line 5, in module sc = CassandraSparkContext(conf=conf) File /spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /spark/pyspark_cassandra.py, line 17, in _do_init self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc) File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 726, in __getattr__ py4j.protocol.Py4JError: Trying to call a package. am I building the wrong connector jar? or using the wrong jar? Thanks a lot, Mohamed. On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Oh, I don't know. thanks a lot Davies, gonna figure that out now On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote: It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil Is it included in /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ? On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py ... 15/02/16 05:58:45 INFO Slf4jLogger: Slf4jLogger started 15/02/16 05:58:45 INFO Remoting:
Re: spark-local dir running out of space during long ALS run
thanks, that looks promissing but can't find any reference giving me more details - can you please point me to something? Also is it possible to force GC from pyspark (as I am using pyspark)? thanks,Antony. On Monday, 16 February 2015, 21:05, Tathagata Das tathagata.das1...@gmail.com wrote: Correct, brute force clean up is not useful. Since Spark 1.0, Spark can do automatic cleanup of files based on which RDDs are used/garbage collected by JVM. That would be the best way, but depends on the JVM GC characteristics. If you force a GC periodically in the driver that might help you get rid of files in the workers that are not needed. TD On Mon, Feb 16, 2015 at 12:27 AM, Antony Mayi antonym...@yahoo.com.invalid wrote: spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way?thanks,Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Re: Shuffle on joining two RDDs
This will be fixed by https://github.com/apache/spark/pull/4629 On Fri, Feb 13, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: yeah I thought the same thing at first too, I suggested something equivalent w/ preservesPartitioning = true, but that isn't enough. the join is done by union-ing the two transformed rdds, which is very different from the way it works under the hood in scala to enable narrow dependencies. It really needs a bigger change to pyspark. I filed this issue: https://issues.apache.org/jira/browse/SPARK-5785 (and the somewhat related issue about documentation: https://issues.apache.org/jira/browse/SPARK-5786) partitioning should still work in pyspark, you still need some notion of distributing work, and the pyspark functions have a partitionFunc to decide that. But, I am not an authority on pyspark, so perhaps there are more holes I'm not aware of ... Imran On Fri, Feb 13, 2015 at 8:36 AM, Karlson ksonsp...@siberie.de wrote: In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid iras...@cloudera.com wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid iras...@cloudera.com wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x = x - x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(64) scala d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x = x - x}.groupByKey(100) val badJoined = d.join(d3) d.setName(d) d2.setName(d2) d3.setName(d3) joined.setName(joined) badJoined.setName(badJoined) //unfortunatley, just looking at the immediate dependencies of joined badJoined is misleading, b/c join actually creates // one more step after the shuffle scala joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd - _} ++ deps.flatMap{dep = flattenDeps(dep.rdd)} } //full dependencies of the good join scala flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at console:16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at console:16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at console:16,org.apache.spark.OneToOneDependency@37514cd) (d
Re: Problem getting pyspark-cassandra and pyspark working
Can you try the example in pyspark-cassandra? If not, you could create a issue there. On Mon, Feb 16, 2015 at 4:07 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: So I tired building the connector from: https://github.com/datastax/spark-cassandra-connector which seems to include the java class referenced in the error message: [root@devzero spark]# unzip -l spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar |grep CassandraJavaUtil 14612 02-16-2015 23:25 com/datastax/spark/connector/japi/CassandraJavaUtil.class [root@devzero spark]# When I try running my spark test job, I still get the exact same error, even though both my jars seems to have been processed by spark. ... 15/02/17 00:00:45 INFO SparkUI: Started SparkUI at http://devzero:4040 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:36929/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424131245595 15/02/17 00:00:45 INFO SparkContext: Added JAR file:/spark/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar at http://10.212.55.42:36929/jars/spark-cassandra-connector-java-assembly-1.2.0-SNAPSHOT.jar with timestamp 1424131245623 15/02/17 00:00:45 INFO Utils: Copying /spark/test2.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/test2.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/test2.py at http://10.212.55.42:36929/files/test2.py with timestamp 1424131245624 15/02/17 00:00:45 INFO Utils: Copying /spark/pyspark_cassandra.py to /tmp/spark-8588b528-d016-42ac-aa7c-e8cf07c1b659/spark-ae3141dd-ae6c-4e99-b7c8-f97ccb3fd8e5/pyspark_cassandra.py 15/02/17 00:00:45 INFO SparkContext: Added file file:/spark/pyspark_cassandra.py at http://10.212.55.42:36929/files/pyspark_cassandra.py with timestamp 1424131245633 15/02/17 00:00:45 INFO Executor: Starting executor ID driver on host localhost 15/ 15/02/17 00:00:47 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. Traceback (most recent call last): File /spark/test2.py, line 5, in module sc = CassandraSparkContext(conf=conf) File /spark/python/pyspark/context.py, line 105, in __init__ conf, jsc) File /spark/pyspark_cassandra.py, line 17, in _do_init self._jcsc = self._jvm.CassandraJavaUtil.javaFunctions(self._jsc) File /spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 726, in __getattr__ py4j.protocol.Py4JError: Trying to call a package. am I building the wrong connector jar? or using the wrong jar? Thanks a lot, Mohamed. On Mon, Feb 16, 2015 at 5:46 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Oh, I don't know. thanks a lot Davies, gonna figure that out now On Mon, Feb 16, 2015 at 5:31 PM, Davies Liu dav...@databricks.com wrote: It also need the Cassandra jar: com.datastax.spark.connector.CassandraJavaUtil Is it included in /spark/pyspark-cassandra-0.1-SNAPSHOT.jar ? On Mon, Feb 16, 2015 at 1:20 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Yes, am sure the system cant find the jar.. but how do I fix that... my submit command includes the jar: /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars /spark/pyspark-cassandra-0.1-SNAPSHOT.jar --driver-class-path /spark/pyspark-cassandra-0.1-SNAPSHOT.jar /spark/test2.py and the spark output seems to indicate it is handling it: 15/02/16 05:58:46 INFO SparkContext: Added JAR file:/spark/pyspark-cassandra-0.1-SNAPSHOT.jar at http://10.212.55.42:56642/jars/pyspark-cassandra-0.1-SNAPSHOT.jar with timestamp 1424066326632 I don't really know what else I could try any suggestions highly appreciated. Thanks, Mohamed. On Mon, Feb 16, 2015 at 4:04 PM, Davies Liu dav...@databricks.com wrote: It seems that the jar for cassandra is not loaded, you should have them in the classpath. On Mon, Feb 16, 2015 at 12:08 PM, Mohamed Lrhazi mohamed.lrh...@georgetown.edu wrote: Hello all, Trying the example code from this package (https://github.com/Parsely/pyspark-cassandra) , I always get this error... Can you see what I am doing wrong? from googling arounf it seems to be that the jar is not found somehow... The spark log shows the JAR was processed at least. Thank you so much. am using spark-1.2.1-bin-hadoop2.4.tgz test2.py is simply: from pyspark.context import SparkConf from pyspark_cassandra import CassandraSparkContext, saveToCassandra conf = SparkConf().setAppName(PySpark Cassandra Sample Driver) conf.set(spark.cassandra.connection.host, devzero) sc = CassandraSparkContext(conf=conf) [root@devzero spark]# /usr/local/bin/docker-enter spark-master bash -c /spark/bin/spark-submit --py-files /spark/pyspark_cassandra.py --jars
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO
Re: hive-thriftserver maven artifact
I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco
Re: Extract hour from Timestamp in Spark SQL
Dear Cheng Hao, You are right! After using the HiveContext, the issue is solved. Thanks, Wush 2015-02-15 10:42 GMT+08:00 Cheng, Hao hao.ch...@intel.com: Are you using the SQLContext? I think the HiveContext is recommended. Cheng Hao *From:* Wush Wu [mailto:w...@bridgewell.com] *Sent:* Thursday, February 12, 2015 2:24 PM *To:* u...@spark.incubator.apache.org *Subject:* Extract hour from Timestamp in Spark SQL Dear all, I am new to Spark SQL and have no experience of Hive. I tried to use the built-in Hive Function to extract the hour from timestamp in spark sql, but got : java.util.NoSuchElementException: key not found: hour How should I extract the hour from timestamp? And I am very confusing about which functions I could use in Spark SQL. Is there any list of available functions except http://spark.apache.org/docs/1.2.0/sql-programming-guide.html#compatibility-with-apache-hive ? Thanks, Wush
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Sean, In this case, I've been testing the code on my local machine and using Spark locally, so I all the log output was available on my terminal. And I've used the .print() method to have an output operation, just to force Spark execute. And I was not using foreachRDD, I was only using print() method on a JavaDStream object, and it was working fine for a few files, up to 16 (and without print() it did not do anything because there were no output operations). To sum it up, in my case: - Initially, use .print() and no foreachRDD: processes up to 16 files and does not do anything for the remaining 4. - Remove .print() and use foreachRDD: processes all of the 20 files. Maybe, as in Akhil Das's suggestion, using .count.print() might also have fixed my problem, but I'm satisfied with foreachRDD approach for now. (Though it is still a mystery to me why using .print() had a difference, maybe my mental model of Spark is wrong, I thought no matter what output operation I used, the number of files processed by Spark would be independent of that because the processing is done in a different method, .print() is only used to force Spark execute that processing, am I wrong?). -- Emre On Mon, Feb 16, 2015 at 2:26 PM, Sean Owen so...@cloudera.com wrote: Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths
hive-thriftserver maven artifact
Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Materialization shouldn't be relevant. The collect by itself doesn't let you detect whether it happened. Print should print some results to the console but on different machines, so may not be a reliable way to see what happened. Yes I understand your real process uses foreachRDD and that's what you should use. It sounds like that works. But you must always have been using that right? What do you mean that you changed to use it? Basically I'm not clear on what the real code does and what about the output of that code tells you only 16 files were processed. On Feb 16, 2015 1:18 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older
Re: Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Instead of print you should do jsonIn.count().print(). Straight forward approach is to use foreachRDD :) Thanks Best Regards On Mon, Feb 16, 2015 at 6:48 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello Sean, I did not understand your question very well, but what I do is checking the output directory (and I have various logger outputs at various stages showing the contents of an input file being processed, the response from the web service, etc.). By the way, I've already solved my problem by using foreachRDD instead of print (see my second message in this thread). Apparently forcing Spark to materialize DAG via print() is not the way to go. (My interpretation might be wrong, but this is what I've just seen in my case). -- Emre On Mon, Feb 16, 2015 at 2:11 PM, Sean Owen so...@cloudera.com wrote: How are you deciding whether files are processed or not? It doesn't seem possible from this code. Maybe it just seems so. On Feb 16, 2015 12:51 PM, Emre Sevinc emre.sev...@gmail.com wrote: I've managed to solve this, but I still don't know exactly why my solution works: In my code I was trying to force the Spark to output via: jsonIn.print(); jsonIn being a JavaDStreamString. When removed the code above, and added the code below to force the output operation, hence the execution: jsonIn.foreachRDD(new FunctionJavaRDDString, Void() { @Override public Void call(JavaRDDString stringJavaRDD) throws Exception { stringJavaRDD.collect(); return null; } }); It works as I expect, processing all of the 20 files I give to it, instead of stopping at 16. -- Emre On Mon, Feb 16, 2015 at 12:56 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input
Question about spark streaming+Flume
Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks
Identify the performance bottleneck from hardware prospective
Hi there, I am trying to scale up the data size that my application is handling. This application is running on a cluster with 16 slave nodes. Each slave node has 60GB memory. It is running in standalone mode. The data is coming from HDFS that also in same local network. In order to have an understanding on how my program is running, I also had a Ganglia installed on the cluster. From previous run, I know the stage that taking longest time to run is counting word pairs (my RDD consists of sentences from a corpus). My goal is to identify the bottleneck of my application, then modify my program or hardware configurations according to that. Unfortunately, I didn't find too much information on Spark monitoring and optimization topics. Reynold Xin gave a great talk on Spark Summit 2014 for application tuning from tasks perspective. Basically, his focus is on tasks that oddly slower than the average. However, it didn't solve my problem because there is no such tasks that run way slow than others in my case. So I tried to identify the bottleneck from hardware prospective. I want to know what the limitation of the cluster is. I think if the executers are running hard, either CPU, memory or network bandwidth (or maybe the combinations) is hitting the roof. But Ganglia reports the CPU utilization of cluster is no more than 50%, network utilization is high for several seconds at the beginning, then drop close to 0. From Spark UI, I can see the nodes with maximum memory usage is consuming around 6GB, while spark.executor.memory is set to be 20GB. I am very confused that the program is not running fast enough, while hardware resources are not in shortage. Could you please give me some hints about what decides the performance of a Spark application from hardware perspective? Thanks! Julaiti
Re: Question about spark streaming+Flume
Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Re: Question about spark streaming+Flume
Ok, you are missing a letter in foreachRDD.. let me proceed.. bit1...@163.com From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Re: Question about spark streaming+Flume
Thanks Arush.. With your code, compiling error occurs: Error:(19, 11) value forechRDD is not a member of org.apache.spark.streaming.dstream.ReceiverInputDStream[org.apache.spark.streaming.flume.SparkFlumeEvent] lines.forechRDD(_.foreach(println)) ^ From: Arush Kharbanda Date: 2015-02-17 14:31 To: bit1...@163.com CC: user Subject: Re: Question about spark streaming+Flume Hi Can you try this val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ) lines.forechRDD(_.foreach(println)) Thanks Arush On Tue, Feb 17, 2015 at 11:17 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying Spark Streaming + Flume example: 1. Code object SparkFlumeNGExample { def main(args : Array[String]) { val conf = new SparkConf().setAppName(SparkFlumeNGExample) val ssc = new StreamingContext(conf, Seconds(10)) val lines = FlumeUtils.createStream(ssc,localhost,) // Print out the count of events received from this server in each batch lines.count().map(cnt = Received + cnt + flume events. at + System.currentTimeMillis() ).print() ssc.start() ssc.awaitTermination(); } } 2. I submit the application with following sh: ./spark-submit --deploy-mode client --name SparkFlumeEventCount --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 2 --class spark.examples.streaming.SparkFlumeNGWordCount spark-streaming-flume.jar When I write data to flume, I only notice the following console information that input is added. storage.BlockManagerInfo: Added input-0-1424151807400 in memory on localhost:39338 (size: 1095.0 B, free: 267.2 MB) 15/02/17 00:43:30 INFO scheduler.JobScheduler: Added jobs for time 142415181 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415182 ms 15/02/17 00:43:40 INFO scheduler.JobScheduler: Added jobs for time 142415187 ms But I didn't the output from the code: Received X flumes events I am no idea where the problem is, any idea? Thanks -- Arush Kharbanda || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
Hello, I have a simple Kafka Spark Streaming example which I am still developing in the standalone mode. Here is what is puzzling me, If I build the assembly jar, use bin/spark-submit to run it, it works fine. But if I want to run the code from within Intellij IDE, then it will cry for this error Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/SparkConf ... Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf Here is my build.sbt file import _root_.sbt.Keys._ import _root_.sbtassembly.Plugin.AssemblyKeys._ import _root_.sbtassembly.Plugin.MergeStrategy import _root_.sbtassembly.Plugin._ import AssemblyKeys._ assemblySettings name := test-kafka version := 1.0 scalaVersion := 2.10.4 jarName in assembly := test-kafka-1.0.jar assemblyOption in assembly ~= { _.copy(includeScala = false) } libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.2.1 % provided, org.apache.spark %% spark-streaming % 1.2.1 % provided, (org.apache.spark %% spark-streaming-kafka % 1.2.1). exclude(commons-beanutils, commons-beanutils). exclude(commons-collections, commons-collections). exclude(com.esotericsoftware.minlog, minlog). exclude(commons-logging, commons-logging) ) mergeStrategy in assembly = (mergeStrategy in assembly) { (old) = { case x if x.startsWith(META-INF/ECLIPSEF.RSA) = MergeStrategy.last case x if x.startsWith(META-INF/mailcap) = MergeStrategy.last case x if x.startsWith(plugin.properties) = MergeStrategy.last case x = old(x) } } I also have this in my project/plugins.sbt resolvers += Resolver.url(sbt-plugin-releases-scalasbt, url(http://repo.scala-sbt.org/scalasbt/sbt-plugin-releases/;)) addSbtPlugin(com.eed3si9n % sbt-assembly % 0.11.2) addSbtPlugin(net.virtual-void % sbt-dependency-graph % 0.7.4) *What is even more interesting is that if I pin the Spark jar to 1.1.1 instead of 1.2.1, then I can successfully run it within IntelliJ. *
Re: hive-thriftserver maven artifact
You can build your own spark with option -Phive-thriftserver. You can publish the jars locally. I hope that would solve your problem. On Mon, Feb 16, 2015 at 8:54 PM, Marco marco@gmail.com wrote: Ok, so will it be only available for the next version (1.30)? 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com: I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco -- Viele Grüße, Marco -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
MLib usage on Spark Streaming
Hello! I'm newbie to Spark and I have the following case study: 1. Client sending at 100ms the following data: {uniqueId, timestamp, measure1, measure2 } 2. Each 30 seconds I would like to correlate the data collected in the window, with some predefined double vector pattern for each given key. The predefined pattern has 300 records. The data should be also sorted by timestamp. 3. When the correlation is greater than a predefined threshold (e.g 0.9) I would like to emit an new message containing {uniqueId, doubleCorrelationValue} 4. For the correlation I would like to use MLlib 5. As a programming language I would like to muse Java 7. Can you please give me some suggestions on how to create the skeleton for the above scenario? Thanks. Regards, Florin
Re: hive-thriftserver maven artifact
Ok, so will it be only available for the next version (1.30)? 2015-02-16 15:24 GMT+01:00 Ted Yu yuzhih...@gmail.com: I searched for 'spark-hive-thriftserver_2.10' on this page: http://mvnrepository.com/artifact/org.apache.spark Looks like it is not published. On Mon, Feb 16, 2015 at 5:44 AM, Marco marco@gmail.com wrote: Hi, I am referring to https://issues.apache.org/jira/browse/SPARK-4925 (Hive Thriftserver Maven Artifact). Can somebody point me (URL) to the artifact in a public repository ? I have not found it @Maven Central. Thanks, Marco -- Viele Grüße, Marco
Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Since SparkConf is only for Spark properties, I think it will in general only pay attention to and preserve spark.* properties. You could experiment with that. In general I wouldn't rely on Spark mechanisms for your configuration, and you can use any config mechanism you like to retain your own properties. On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
Sean, I'm trying this as an alternative to what I currently do. Currently I have my module.properties file for my module in the resources directory, and that file is put inside the über JAR file when I build my application with Maven, and then when I submit it using spark-submit, I can read that module.properties file via the traditional method: properties.load(MyModule.class.getClassLoader().getResourceAsStream(module.properties)); and everything works fine. The disadvantage is that in order to make any changes to that .properties file effective, I have to re-build my application. Therefore I'm trying to find a way to be able to send that module.properties file via spark-submit and read the values in iy, so that I will not be forced to build my application every time I want to make a change in the module.properties file. I've also checked the --files option of spark-submit, but I see that it is for sending the listed files to executors (correct me if I'm wrong), what I'm after is being able to pass dynamic properties (key/value pairs) to the Driver program of my Spark application. And I still could not find out how to do that. -- Emre On Mon, Feb 16, 2015 at 4:28 PM, Sean Owen so...@cloudera.com wrote: Since SparkConf is only for Spark properties, I think it will in general only pay attention to and preserve spark.* properties. You could experiment with that. In general I wouldn't rely on Spark mechanisms for your configuration, and you can use any config mechanism you like to retain your own properties. On Mon, Feb 16, 2015 at 3:26 PM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç -- Emre Sevinc
Re: Can't I mix non-Spark properties into a .properties file and pass it to spark-submit via --properties-file?
I haven't actually tried mixing non-Spark settings into the Spark properties. Instead I package my properties into the jar and use the Typesafe Config[1] - v1.2.1 - library (along with Ficus[2] - Scala specific) to get at my properties: Properties file: src/main/resources/integration.conf (below $ENV might be set to either integration or prod[3]) ssh -t root@$HOST /root/spark/bin/spark-shell --jars /root/$JAR_NAME \ --conf 'config.resource=$ENV.conf' \ --conf 'spark.executor.extraJavaOptions=-Dconfig.resource=$ENV.conf' Since the properties file is packaged up with the JAR I don't have to worry about sending the file separately to all of the slave nodes. Typesafe Config is written in Java so it will work if you're not using Scala. (The Typesafe Config also has the advantage of being extremely easy to integrate with code that is using Java Properties today.) If you instead want to send the file separately from the JAR and you use the Typesafe Config library, you can specify config.file instead of .resource; though I'd point you to [3] below if you want to make your development life easier. 1. https://github.com/typesafehub/config 2. https://github.com/ceedubs/ficus 3. http://deploymentzone.com/2015/01/27/spark-ec2-and-easy-spark-shell-deployment/ On Mon Feb 16 2015 at 10:27:01 AM Emre Sevinc emre.sev...@gmail.com wrote: Hello, I'm using Spark 1.2.1 and have a module.properties file, and in it I have non-Spark properties, as well as Spark properties, e.g.: job.output.dir=file:///home/emre/data/mymodule/out I'm trying to pass it to spark-submit via: spark-submit --class com.myModule --master local[4] --deploy-mode client --verbose --properties-file /home/emre/data/mymodule.properties mymodule.jar And I thought I could read the value of my non-Spark property, namely, job.output.dir by using: SparkConf sparkConf = new SparkConf(); final String validatedJSONoutputDir = sparkConf.get(job.output.dir); But it gives me an exception: Exception in thread main java.util.NoSuchElementException: job.output.dir Is it not possible to mix Spark and non-Spark properties in a single .properties file, then pass it via --properties-file and then get the values of those non-Spark properties via SparkConf? Or is there another object / method to retrieve the values for those non-Spark properties? -- Emre Sevinç
Re: Loading tables using parquetFile vs. loading tables from Hive metastore with Parquet serde
Hi Jianshi, When accessing a Hive table with Parquet SerDe, Spark SQL tries to convert it into Spark SQL's native Parquet support for better performance. And yes, predicate push-down, column pruning are applied here. In 1.3.0, we'll also cover the write path except for writing partitioned table. Cheng On Sun Feb 15 2015 at 9:22:15 AM Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, If I have a table in Hive metastore saved as Parquet, and I want to use it in Spark. It seems Spark will use Hive's Parquet serde to load the actual data. So is there any difference here? Will predicate pushdown, pruning and future Parquet optimizations in SparkSQL work for using Hive serde? Loading tables using parquetFile vs. loading tables from Hive metastore with Parquet serde Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Which OutputCommitter to use for S3?
HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595https://issues.apache.org/jira/browse/SPARK-3595), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu
Re: spark-local dir running out of space during long ALS run
spark.cleaner.ttl is not the right way - seems to be really designed for streaming. although it keeps the disk usage under control it also causes loss of rdds and broadcasts that are required later leading to crash. is there any other way?thanks,Antony. On Sunday, 15 February 2015, 21:42, Antony Mayi antonym...@yahoo.com wrote: spark.cleaner.ttl ? On Sunday, 15 February 2015, 18:23, Antony Mayi antonym...@yahoo.com wrote: Hi, I am running bigger ALS on spark 1.2.0 on yarn (cdh 5.3.0) - ALS is using about 3 billions of ratings and I am doing several trainImplicit() runs in loop within one spark session. I have four node cluster with 3TB disk space on each. before starting the job there is less then 8% of the disk space used. while the ALS is running I can see the disk usage rapidly growing mainly because of files being stored under yarn/local/usercache/user/appcache/application_XXX_YYY/spark-local-ZZZ-AAA. after about 10 hours the disk usage hits 90% and yarn kills the particular containers. am I missing doing some cleanup somewhere while looping over the several trainImplicit() calls? taking 4*3TB of disk space seems immense. thanks for any help,Antony.
Re: Array in broadcast can't be serialized
Thanks Ted After searching for a whole day, I still don't know how to let spark use twitter chill serialization - there are very few documents about how to integrate twitter chill into Spark for serialization. I tried the following, but an exception of java.lang.ClassCastException: com.twitter.chill.WrappedArraySerializer cannot be cast to org.apache.spark.serializer.Serializer was thrown: val conf = new SparkConf() .setAppName(Test Serialization) .set(spark.serializer, com.twitter.chill.WrappedArraySerializer) Well, what is the correct way of configuring Spark to use the twitter chill serialization framework ? 2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com: I was looking at https://github.com/twitter/chill It seems this would achieve what you want: chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala Cheers On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Spark Streaming and SQL checkpoint error: (java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf)
I have a streaming application which registered temp table on a HiveContext for each batch duration. The application runs well in Spark 1.1.0. But I get below error from 1.1.1. Do you have any suggestions to resolve it? Thank you! java.io.NotSerializableException: org.apache.hadoop.hive.conf.HiveConf - field (class scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@2158ce23,org.apa che.hadoop.hive.ql.session.SessionState@49b6eef9)) - field (class org.apache.spark.sql.hive.HiveContext, name: x$3, type: class scala.Tuple2) - object (class org.apache.spark.sql.hive.HiveContext, org.apache.spark.sql.hive.HiveContext@4e6e66a4) - field (class com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi sterTempTable$2, name: sqlContext$1, type: class org.apache.spark.sql.SQLContext) - object (class com.vitria.spark.streaming.api.scala.BaseQueryableDStream$$anonfun$regi sterTempTable$2, function1) - field (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1, name: foreachFunc$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1, function2) - field (class org.apache.spark.streaming.dstream.ForEachDStream, name: org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20) - element of array (index: 0) - array (class [Ljava.lang.Object;, size: 16) - field (class scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@5ccbdc20)) - field (class org.apache.spark.streaming.DStreamGraph, name: outputStreams, type: class scala.collection.mutable.ArrayBuffer) - custom writeObject data (class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@776ae7da) - field (class org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - root object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@5eade065) at java.io.ObjectOutputStream.writeObject0(Unknown Source)
Re: Writing to HDFS from spark Streaming
PS this is the real fix to this issue: https://issues.apache.org/jira/browse/SPARK-5795 I'd like to merge it as I don't think it breaks the API; it actually fixes it to work as intended. On Mon, Feb 16, 2015 at 3:25 AM, Bahubali Jain bahub...@gmail.com wrote: I used the latest assembly jar and the below as suggested by Akhil to fix this problem... temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,(Class) TextOutputFormat.class); Thanks All for the help ! On Wed, Feb 11, 2015 at 1:38 PM, Sean Owen so...@cloudera.com wrote: That kinda dodges the problem by ignoring generic types. But it may be simpler than the 'real' solution, which is a bit ugly. (But first, to double check, are you importing the correct TextOutputFormat? there are two versions. You use .mapred. with the old API and .mapreduce. with the new API.) Here's how I've formally casted around it in similar code: @SuppressWarnings Class? extends OutputFormat?,? outputFormatClass = (Class? extends OutputFormat?,?) (Class?) TextOutputFormat.class; and then pass that as the final argument. On Wed, Feb 11, 2015 at 6:35 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try : temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,(Class) TextOutputFormat.class); Thanks Best Regards On Wed, Feb 11, 2015 at 9:40 AM, Bahubali Jain bahub...@gmail.com wrote: Hi, I am facing issues while writing data from a streaming rdd to hdfs.. JavaPairDstreamString,String temp; ... ... temp.saveAsHadoopFiles(DailyCSV,.txt, String.class, String.class,TextOutputFormat.class); I see compilation issues as below... The method saveAsHadoopFiles(String, String, Class?, Class?, Class? extends OutputFormat?,?) in the type JavaPairDStreamString,String is not applicable for the arguments (String, String, ClassString, ClassString, ClassTextOutputFormat) I see same kind of problem even with saveAsNewAPIHadoopFiles API . Thanks, Baahu -- Twitter:http://twitter.com/Baahu - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Magic number 16: Why doesn't Spark Streaming process more than 16 files?
Hello, I have an application in Java that uses Spark Streaming 1.2.1 in the following manner: - Listen to the input directory. - If a new file is copied to that input directory process it. - Process: contact a RESTful web service (running also locally and responsive), send the contents of the file, receive the response from the web service, write the results as a new file into the output directory - batch interval : 30 seconds - checkpoint interval: 150 seconds When I test the application locally with 1 or 2 files, it works perfectly fine as expected. I run it like: spark-submit --class myClass --verbose --master local[4] --deploy-mode client myApp.jar /in file:///out But then I've realized something strange when I copied 20 files to the INPUT directory: Spark Streaming detects all of the files, but it ends up processing *only 16 files*. And the remaining 4 are not processed at all. I've tried it with 19, 18, and then 17 files. Same result, only 16 files end up in the output directory. Then I've tried it by copying 16 files at once to the input directory, and it can process all of the 16 files. That's why I call it magic number 16. When I mean it detects all of the files, I mean that in the logs I see the following lines when I copy 17 files: === 2015-02-16 12:30:51 INFO SpotlightDriver:70 - spark.executor.memory: 1G 2015-02-16 12:30:51 WARN Utils:71 - Your hostname, emre-ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-02-16 12:30:51 WARN Utils:71 - Set SPARK_LOCAL_IP if you need to bind to another address 2015-02-16 12:30:52 INFO Slf4jLogger:80 - Slf4jLogger started 2015-02-16 12:30:52 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Recovered 2 write ahead log files from file:/tmp/receivedBlockMetadata 2015-02-16 12:30:53 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Reading from the logs: file:/tmp/receivedBlockMetadata/log-1424086110599-1424086170599 file:/tmp/receivedBlockMetadata/log-1424086200861-1424086260861 --- Time: 142408626 ms --- 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408596: 2015-02-16 12:31:00 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408596 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:30 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO FileInputFormat:280 - Total input paths to process : 1 2015-02-16 12:31:31 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Attempting to clear 0 old log files in file:/tmp/receivedBlockMetadata older than 142408599: 2015-02-16 12:31:31 INFO WriteAheadLogManager for ReceivedBlockHandlerMaster:59 - Cleared log files in file:/tmp/receivedBlockMetadata older than 142408599 --- Time: 142408629 ms ---