[General Question] [Hadoop + Spark at scale] Spark Rack Awareness ?
I wanted to ask a general question about Hadoop/Yarn and Apache Spark integration. I know that Hadoop on a physical cluster has rack awareness. i.e. It attempts to minimise network traffic by saving replicated blocks within a rack. i.e. I wondered whether, when Spark is configured to use Yarn as a cluster manager, it is able to use this feature to also minimise network traffic to a degree. Sorry if this questionn is not quite accurate but I think you can generally see what I mean ?
[Spark 1.3.1] Spark HiveQL - CDH 5.3 Hive 0.13 UDF's
Hi I have a five node CDH 5.3 cluster running on CentOS 6.5, I also have a separate install of Spark 1.3.1. ( The CDH 5.3 install has Spark 1.2 but I wanted a newer version. ) I managed to write some Scala based code using a Hive Context to connect to Hive and create/populate tables etc. I compiled my application using sbt and ran it with spark-submit in local mode. My question concerns UDF's, specifically the function row_sequence function in the hive-contrib jar file i.e. hiveContext.sql( ADD JAR /opt/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/hive-contrib-0.13.1-cdh5.3.3.jar ) hiveContext.sql( CREATE TEMPORARY FUNCTION row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence'; ) val resRDD = hiveContext.sql( SELECT row_sequence(),t1.edu FROM ( SELECT DISTINCT education AS edu FROM adult3 ) t1 ORDER BY t1.edu ) This seems to generate its sequence in the map (?) phase of execution because no matter how I fiddle with the main SQL I could not get an ascending index for dimension data. i.e. I always get 1 val1 1 val2 1 val3 instead of 1 val1 2 val2 3 val3 Im well aware that I can play with scala and get around this issue and I have but I wondered whether others have come across this and solved it ? cheers Mike F
[Spark 1.3.1 SQL] Using Hive
Hi Is it true that if I want to use Spark SQL ( for Spark 1.3.1 ) against Apache Hive I need to build a source version of Spark ? Im using CDH 5.3 on CentOS Linux 6.5 which uses Hive 0.13.0 ( I think ). cheers Mike F
spark stream twitter question ..
Hi I have a question about Spark Twitter stream processing in Spark 1.3.1, the code sample below just opens up a twitter stream, uses auth keys, splits out has tags and creates a temp table. However, when I try to compile it using sbt ( CentOS 6.5) I get the error [error] /home/hadoop/spark/twitter1/src/main/scala/twitter1.scala:54: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] [error] val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF() I know that I need to import sqlContext.implicits._ which is what Ive tried but I still get the error. Can anyone advise ? import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.sql._ import org.apache.spark.sql.types.{StructType,StructField,StringType} object twitter1 { def main(args: Array[String]) { // create a spark conf and context val appName = Twitter example 1 val conf= new SparkConf() conf.setAppName(appName) val sc = new SparkContext(conf) // set twitter auth key values val consumerKey = QQxxx val consumerSecret = 0HFxxx val accessToken = 32394xxx val accessTokenSecret = IlQvscxxx // set twitter auth properties // https://apps.twitter.com/ System.setProperty(twitter4j.oauth.consumerKey, consumerKey) System.setProperty(twitter4j.oauth.consumerSecret, consumerSecret) System.setProperty(twitter4j.oauth.accessToken, accessToken) System.setProperty(twitter4j.oauth.accessTokenSecret, accessTokenSecret) val ssc= new StreamingContext(sc, Seconds(5) ) val stream = TwitterUtils.createStream(ssc,None) val hashTags = stream.flatMap( status = status.getText.split( ).filter(_.startsWith(#))) // val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ hashTags.foreachRDD{ rdd = val dfHashTags = rdd.map(hashT = Row(hashT) ).toDF() dfHashTags.registerTempTable(tweets) } // extra stuff here ssc.start() ssc.awaitTermination() } // end main } // end twitter1 cheers Mike F
Spark sql and csv data processing question
Hi Im getting the following error when trying to process a csv based data file. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 262, hc2r1m3.semtech-solutions.co.nz): java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.spark.sql.catalyst.expressions.GenericRow.apply(Row.scala:142) at org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:37) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) I have made sure that none of my data rows are empty and that they all have 15 records. I have also physically checked the data. The error occurs when I run the actual spark sql on the last line. The script is as follows. val server= hdfs://hc2nn.semtech-solutions.co.nz:8020 val path = /data/spark/h2o/ val train_csv = server + path + adult.train.data // 32,562 rows val test_csv = server + path + adult.test.data // 16,283 rows // load the data val rawTrainData = sparkCxt.textFile(train_csv) val rawTestData = sparkCxt.textFile(test_csv) // create a spark sql schema for the row val schemaString = age workclass fnlwgt education educationalnum maritalstatus + occupation relationship race gender capitalgain capitalloss + hoursperweek nativecountry income val schema = StructType( schemaString.split( ) .map(fieldName = StructField(fieldName, StringType, false))) // create an RDD from the raw training data val trainRDD = rawTrainData .filter(!_.isEmpty) .map(rawRow = Row.fromSeq(rawRow.split(,) .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( Raw Training Data Count = + trainRDD.count() ) val testRDD = rawTestData .filter(!_.isEmpty) .map(rawRow = Row.fromSeq(rawRow.split(,) .filter(_.length == 15) .map(_.toString).map(_.trim) )) println( Raw Testing Data Count = + testRDD.count() ) // create a schema RDD val trainSchemaRDD = sqlContext.applySchema(trainRDD, schema) val testSchemaRDD = sqlContext.applySchema(testRDD, schema) // register schema RDD as a table trainSchemaRDD.registerTempTable(trainingTable) testSchemaRDD.registerTempTable(testingTable) println( Schema RDD Training Data Count = + trainSchemaRDD.count() ) println( Schema RDD Testing Data Count = + testSchemaRDD.count() ) // now run sql against the table to filter the data val schemaRddTrain = sqlContext.sql( SELECT + age,workclass,education,maritalstatus,occupation,relationship,race,+ gender,hoursperweek,nativecountry,income + FROM trainingTable LIMIT 5000) println( Training Data Count = + schemaRddTrain.count() ) Any advice is appreciated :)