[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?

2015-07-18 Thread Mike Frampton
I wanted to ask a general question about Hadoop/Yarn and Apache Spark 
integration. I know that 
Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise 
network traffic 
by saving replicated blocks within a rack. i.e. 

I wondered whether, when Spark is configured to use Yarn as a cluster manager, 
it is able to 
use this feature to also minimise network traffic to a degree. 

Sorry if this questionn is not quite accurate but I think you can generally see 
what I mean ? 
  

[Spark 1.3.1] Spark HiveQL - CDH 5.3 Hive 0.13 UDF's

2015-06-26 Thread Mike Frampton
Hi 

I have a five node CDH 5.3 cluster running on CentOS 6.5, I also have a 
separate 
install of Spark 1.3.1. ( The CDH 5.3 install has Spark 1.2 but I wanted a 
newer version. )

I managed to write some Scala based code using a Hive Context to connect to 
Hive and 
create/populate  tables etc. I compiled my application using sbt and ran it 
with spark-submit
in local mode. 

My question concerns UDF's, specifically the function row_sequence function in 
the hive-contrib 
jar file i.e.  

hiveContext.sql(

ADD JAR 
/opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar

  )

hiveContext.sql(

CREATE TEMPORARY FUNCTION row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';

  )


 val resRDD = hiveContext.sql(

  SELECT row_sequence(),t1.edu FROM
( SELECT DISTINCT education AS edu FROM adult3 ) t1
  ORDER BY t1.edu

)

This seems to generate its sequence in the map (?) phase of execution because 
no matter how I fiddle 
with the main SQL I could not get an ascending index for dimension data. i.e. I 
always get 

1  val1
1  val2
1  val3

instead of 

1  val1
2  val2
3  val3

Im well aware that I can play with scala and get around this issue and I have 
but I wondered whether others
have come across this and solved it ? 

cheers

Mike F

[Spark 1.3.1 SQL] Using Hive

2015-06-21 Thread Mike Frampton
Hi 

Is it true that if I want to use Spark SQL ( for Spark 1.3.1 ) against Apache 
Hive I need to build a source version of Spark ? 

Im using CDH 5.3 on CentOS Linux 6.5 which uses Hive 0.13.0 ( I think ). 

cheers

Mike F

  

spark stream twitter question ..

2015-06-13 Thread Mike Frampton
Hi 

I have a question about Spark Twitter stream processing in Spark 1.3.1, the 
code sample below just opens 
up a twitter stream, uses auth keys, splits out has tags and creates a temp 
table. However, when I try to compile
it using sbt ( CentOS 6.5) I get the error 

[error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value 
toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]   val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF()

I know that I need to import sqlContext.implicits._ which is what Ive tried 
but I still get the error. Can anyone advise ? 

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType}


object twitter1 {

  def main(args: Array[String]) {

// create  a spark conf and context

val appName = Twitter example 1
val conf= new SparkConf()

conf.setAppName(appName)

val sc = new SparkContext(conf)

// set twitter auth key values

val consumerKey = QQxxx
val consumerSecret = 0HFxxx
val accessToken   = 32394xxx
val accessTokenSecret = IlQvscxxx

// set twitter auth properties
//   https://apps.twitter.com/

System.setProperty(twitter4j.oauth.consumerKey, consumerKey)
System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret)
System.setProperty(twitter4j.oauth.accessToken, accessToken)
System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret)

val ssc= new StreamingContext(sc, Seconds(5) )
val stream = TwitterUtils.createStream(ssc,None)

val hashTags = stream.flatMap( status = status.getText.split( 
).filter(_.startsWith(#)))

  //  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

hashTags.foreachRDD{ rdd =
  val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF()
  dfHashTags.registerTempTable(tweets)
}

// extra stuff here

ssc.start()
ssc.awaitTermination()

  } // end main

} // end twitter1

cheers

Mike F

  

Spark sql and csv data processing question

2015-05-15 Thread Mike Frampton
Hi 

Im getting the following error when trying to process a csv based data file.

Exception in thread main org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost 
task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): 
java.lang.ArrayIndexOutOfBoundsException: 0
at 
org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142)
at 
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)

I have made sure that none of my data rows are empty and that they all have 15 
records. I have also physically checked the
data. The error occurs when I run the actual spark sql on the last line. The 
script is as follows. 

  val server= hdfs://hc2nn.semtech-solutions.co.nz:8020
  val path  = /data/spark/h2o/

  val train_csv =  server + path + adult.train.data // 32,562 rows
  val test_csv  =  server + path + adult.test.data  // 16,283 rows

  // load the data

  val rawTrainData = sparkCxt.textFile(train_csv)
  val rawTestData  = sparkCxt.textFile(test_csv)

  // create a spark sql schema for the row

  val schemaString = age workclass fnlwgt education educationalnum 
maritalstatus +
  occupation relationship race gender capitalgain 
capitalloss +
  hoursperweek nativecountry income

  val schema = StructType( schemaString.split( )
  .map(fieldName = StructField(fieldName, StringType, false)))

  // create an RDD from the raw training data

  val trainRDD  = rawTrainData
 .filter(!_.isEmpty)
 .map(rawRow = Row.fromSeq(rawRow.split(,)
 .filter(_.length == 15)
 .map(_.toString).map(_.trim) ))

  println(  Raw Training Data Count =  + trainRDD.count() )

  val testRDD   = rawTestData
 .filter(!_.isEmpty)
 .map(rawRow  = Row.fromSeq(rawRow.split(,)
 .filter(_.length == 15)
 .map(_.toString).map(_.trim) ))

  println(  Raw Testing Data Count =  + testRDD.count() )

  // create a schema RDD

  val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema)
  val testSchemaRDD  = sqlContext.applySchema(testRDD,  schema)

  // register schema RDD as a table

  trainSchemaRDD.registerTempTable(trainingTable)
  testSchemaRDD.registerTempTable(testingTable)

  println(  Schema RDD Training Data Count =  + trainSchemaRDD.count() )
  println(  Schema RDD Testing Data Count  =  + testSchemaRDD.count() )

  // now run sql against the table to filter the data

  val schemaRddTrain = sqlContext.sql(
SELECT +
   age,workclass,education,maritalstatus,occupation,relationship,race,+
   gender,hoursperweek,nativecountry,income +
FROM trainingTable LIMIT 5000)

  println(  Training Data Count =  + schemaRddTrain.count() )

Any advice is appreciated :)