Re: Quality of documentation (rant)

2014-01-19 Thread Matt Massie
Debasiah-

Just wanted to let you know that using Parquet with Spark is really easy to
do. Take a look at
http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/ for an example.
Parquet provides a HadoopInputFormat to read data and includes support for
predicate pushdown and projection.



--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie https://twitter.com/matt_massie,
@amplabhttps://twitter.com/amplab
https://amplab.cs.berkeley.edu/


On Sun, Jan 19, 2014 at 8:41 AM, Debasish Das debasish.da...@gmail.comwrote:

 Hi Ognen,

 We have been running hdfs, yarn amd spark on 20 beefy nodes. I give half
 of the cores to spark and use rest for yarn mr. For optimizing the network
 transfer for rdd creation it is better to have spark run on all nodes of
 hdfs.

 For preprocessing the data for algorithms I use yarn mr app since the
 input data can be stored in various formats that spark does not support yet
 (things like parquet) but platform people like them due to various reasons
 like data compression. Once the preprocessor saves the data on hdfs as text
 file or sequence file,  then spark gives you orders of magnitude runtime
 compared to yarn algorithm.

 I have benchmarked ALS and could run the dataset in 14 mins for 10
 iteration while scalable als algorithm from clodera oryx ran 6 iterations
 in an hour. Note the they are supposedly implementing same als paper. On
 the same dataset mahout als fails as it needs more memory than 6 gb which
 default yarn uses. I have to still look into results in more details and
 the code to be sure what they are doing.

 Note that mahout algorithms are not optimized for yarn yet and the master
 mahout branch is broken for yarn. Thanks to Cloudera help, we could patch
 it up. Number of yarn algorithms are not very high right now.

 Cdh5.0 is integrating spark with their cdh manager similar to what they
 did with solr. It should be released by March 2014. They have the beta
 already. It will definitely ease up the process to make spark operational.

 I have not tested my setup on ec2 (it runs on internal hadoop cluster) but
 for that most likely I will use cdh manager from 5 beta. I will update you
 more with the ec2 experience.

 Thanks.
 Deb
 On Jan 19, 2014 6:53 AM, Ognen Duzlevski og...@nengoiksvelzud.com
 wrote:

 On Sun, Jan 19, 2014 at 2:49 PM, Ognen Duzlevski 
 og...@nengoiksvelzud.com wrote:


 My basic requirement is to set everything up myself and understand it.
 For testing purposes my cluster has 15 xlarge instances and I guess I will
 just set up a hadoop cluster to run over these instances for the purposes
 of getting the benefits of HDFS. I would then set up hdfs over S3 with
 blocks.


 By this I mean I would set up a Hadoop cluster running in parallel on the
 same instances just for the purposes of running Spark over HDFS. Is this a
 reasonable approach? What kind of a performance penalty (memory, CPU
 cycles) am I going to incur by the Hadoop daemons running just for this
 purpose?

 Thanks!
 Ognen




Re: Spark Avro in Scala

2013-11-18 Thread Matt Massie
Agree with Eugen that you should used Kryo.

But even better is to embed your Avro objects inside of Kryo. This allows
you to have the benefits of both Avro and Kryo.

Here's example code for using Avro with Kryo.

https://github.com/massie/adam/blob/master/adam-commands/src/main/scala/edu/berkeley/cs/amplab/adam/serialization/AdamKryoRegistrator.scala

You need to register all of you Avro SpecificClasses with Kryo and have it
use the AvroSerializer class to encode/decode them.

e.g.

class AdamKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[MySpecificAvroClass], new
AvroSerializer[MySpecificAvroClass]()
  }
}










--
Matt Massie
UC, Berkeley AMPLab
Twitter: @matt_massie https://twitter.com/matt_massie,
@amplabhttps://twitter.com/amplab
https://amplab.cs.berkeley.edu/


On Mon, Nov 18, 2013 at 10:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote:

 Hi Robert,

 The problem is that spark uses java serialization requiring serialized
 objects to implement Serializable, AvroKey doesn't.
 As a workaround you can try using 
 kryohttp://spark.incubator.apache.org/docs/latest/tuning.html#data-serializationfor
  the serialization.

 Eugen


 2013/11/11 Robert Fink ursula2...@gmail.com

 Hi,

 I am trying to get the following minimal Scala example work: Using Spark
 to process Avro records. Here's my dummy Avro definition:

 {
   namespace: com.avrotest,
   type: record,
   name: AvroTest,
   fields: [
 {name: field1, type: [string, null]}
   ]
 }

 I experiment with a simple job that creates three AvroTest objects,
 writes them out to a file through a SparkContext, and then reads in the
 thus generated Avro file and performs a simple grouping operation:

 //
 -
 import org.apache.spark.SparkContext._
 import org.apache.avro.specific.SpecificDatumWriter
 import org.apache.avro.file.DataFileWriter
 import org.apache.avro._
 import org.apache.avro.generic._
 import org.apache.hadoop.mapreduce.Job
 import com.avrotest.AvroTest
 import java.io.File

 object SparkTest{
   def main(args: Array[String]) {

 def avrofile = output.avro
 def sc = new SparkContext(local, Simple App)
 val job = new Job()

 val record1 = new AvroTest()
 record1.setField1(value1)
 val record2 = new AvroTest()
 record2.setField1(value1)
 val record3 = new AvroTest()
 record3.setField1(value2)

 def userDatumWriter = new SpecificDatumWriter[AvroTest]()
 val dataFileWriter = new DataFileWriter[AvroTest](userDatumWriter)
  def file = new File(avrofile)
 dataFileWriter.create(record1.getSchema(), file)
 dataFileWriter.append(record1)
 dataFileWriter.append(record2)
 dataFileWriter.append(record3)
 dataFileWriter.close()

 def rdd = sc.newAPIHadoopFile(
   avrofile,
   classOf[org.apache.avro.mapreduce.AvroKeyInputFormat[AvroTest]],
   classOf[org.apache.avro.mapred.AvroKey[AvroTest]],
   classOf[org.apache.hadoop.io.NullWritable],
   job.getConfiguration)
 // rdd.foreach( x = println(x._1.datum.getField1) ) // Prints
 value1, value1, value2
 val numGroups= rdd.groupBy(x = x._1.datum.getField1).count()
   }
 }
 //
 -

 I would expect numGroups==2 in the last step, because record1 and record2
 share the getField1()==value1, and record3 has getField1() == value2.
 However, the script fails to execute with the following error (see below).
 Can anyone give me a hint what could be wrong in the above code, or post an
 example of reading from an Avro file and performing some simple
 computations on the retrieved objects? Thank you so much! Robert.

 11650 [pool-109-thread-1] WARN
 org.apache.avro.mapreduce.AvroKeyInputFormat - Reader schema was not set.
 Use AvroJob.setInputKeySchema() if desired.
 11661 [pool-109-thread-1] INFO
 org.apache.avro.mapreduce.AvroKeyInputFormat - Using a reader schema equal
 to the writer schema.
 12293 [spark-akka.actor.default-dispatcher-5] INFO
 org.apache.spark.scheduler.local.LocalTaskSetManager - Loss was due to
 java.io.NotSerializableException
 java.io.NotSerializableException: org.apache.avro.mapred.AvroKey
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
 at
 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 at
 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
 at
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508