----------------------------------------------
Code
----------------------------------------------
scala> import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._
scala> import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.RDD
scala> import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.SchemaRDD
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
scala> val hiveContext: HiveContext = new HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@2de76244
scala> val numDays = 2
numDays: Int = 2
scala> case class Click(
/* about 20 fields of type STRING */
)
defined class Click
scala> val inputRDD = new Array[SchemaRDD](numDays)
inputRDD: Array[org.apache.spark.sql.SchemaRDD] = Array(null, null)
scala> for (i <- 1 to numDays) {
| if (i < 10) {
| inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://................" + i)
| } else {
| inputRDD(i - 1) =
hiveContext.parquetFile("hdfs://................" + i)
| }
|
| }
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
scala> var unionRDD = inputRDD(1)
unionRDD: org.apache.spark.sql.SchemaRDD =
SchemaRDD[1] at RDD at SchemaRDD.scala:104
scala> for (i <- 1 to inputRDD.length - 1) {
| unionRDD = unionRDD.unionAll(inputRDD(i))
| }
scala> val inputRDD = unionRDD
inputRDD: org.apache.spark.sql.SchemaRDD =
SchemaRDD[2] at RDD at SchemaRDD.scala:104
scala>
scala> inputRDD.registerTempTable("urlInfo")
scala> val clickstreamRDD = hiveContext.sql("select * from urlInfo "
+
| "where guid regexp '^[0-9a-f-]{36}$' " +
| "AND ((callerid > 3 AND callerid <10000) OR callerid >
100000 " +
| "OR (callerid =3 AND browsertype = 'IE')) " +
| "AND countrycode regexp '^[A-Z]{2}$'")
clickstreamRDD: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:104
scala>
scala> clickstreamRDD.registerTempTable("clickstream")
scala> clickstreamRDD.cache()
res4: clickstreamRDD.type =
SchemaRDD[3] at RDD at SchemaRDD.scala:104
scala> val guidClickRDD = clickstreamRDD.map(row =>
(row(7).asInstanceOf[String], {
| val value = Click(row(0).asInstanceOf[String],
| row(1).asInstanceOf[String],
row(2).asInstanceOf[String],
| row(3).asInstanceOf[String],
row(4).asInstanceOf[String],
| row(5).asInstanceOf[String],
row(6).asInstanceOf[String],
| row(7).asInstanceOf[String],
row(8).asInstanceOf[String],
| row(9).asInstanceOf[String],
row(10).asInstanceOf[String],
| row(11).asInstanceOf[String],
row(12).asInstanceOf[String],
| row(13).asInstanceOf[String],
row(14).asInstanceOf[String],
| row(15).asInstanceOf[String],
row(16).asInstanceOf[String],
| row(17).asInstanceOf[String],
row(18).asInstanceOf[String],
| row(19).asInstanceOf[String])
| value
| }))
guidClickRDD: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[14] at
map at <console>:25
scala> val blackList: RDD[(String, Click)] =
guidClickRDD.groupByKey().filter(row => row._2.size == 1).map(row =>
| (row._1.asInstanceOf[String], Click("", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "", "", "", "")))
blackList: org.apache.spark.rdd.RDD[(String, Click)] = MappedRDD[27] at map
at <console>:27
scala> val guidClickFRDD = guidClickRDD.subtractByKey(blackList)
guidClickFRDD: org.apache.spark.rdd.RDD[(String, Click)] = SubtractedRDD[28]
at subtractByKey at <console>:29
scala> guidClickFRDD.reduceByKey((x, y) => {
| /* commutative and associative function */
| Click("US", "US", "US", "US", "US", "US", "US", "US",
"US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US", "US")
| }).take(200).foreach(println)
----------------------------------------------
EXPECTED OUTPUT
----------------------------------------------
(Key_A,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_B,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_C,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
(Key_D,Click(US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US,US))
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-YarnClientClusterScheduler-Lost-executor-Akka-client-disassociated-tp20625p20626.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]