I have a EC2 cluster created using spark version 1.2.1.
And I have a SBT project .
Now I want to upgrade to spark 1.3 and use the new features.
Below are issues .
Sorry for the long post.
Appreciate your help.
Thanks
-Roni
Question - Do I have to create a new cluster using spark 1.3?
Here is what I did -
In my SBT file I changed to -
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
But then I started getting compilation error. along with
Here are some of the libraries that were evicted:
[warn] * org.apache.spark:spark-core_2.10:1.2.0 -> 1.3.0
[warn] * org.apache.hadoop:hadoop-client:(2.5.0-cdh5.2.0, 2.2.0) -> 2.6.0
[warn] Run 'evicted' to see detailed eviction warnings
constructor cannot be instantiated to expected type;
[error] found : (T1, T2)
[error] required: org.apache.spark.sql.catalyst.expressions.Row
[error] val ty = joinRDD.map{case(word,
(file1Counts, file2Counts)) => KmerIntesect(word, file1Counts,"xyz")}
[error] ^
Here is my SBT and code --
SBT -
version := "1.0"
scalaVersion := "2.10.4"
resolvers += "Sonatype OSS Snapshots" at "
https://oss.sonatype.org/content/repositories/snapshots";
resolvers += "Maven Repo1" at "https://repo1.maven.org/maven2";
resolvers += "Maven Repo" at "
https://s3.amazonaws.com/h2o-release/h2o-dev/master/1056/maven/repo/";
/* Dependencies - %% appends Scala version to artifactId */
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0"
libraryDependencies += "org.bdgenomics.adam" % "adam-core" % "0.16.0"
libraryDependencies += "ai.h2o" % "sparkling-water-core_2.10" % "0.2.10"
CODE --
import org.apache.spark.{SparkConf, SparkContext}
case class KmerIntesect(kmer: String, kCount: Int, fileName: String)
object preDefKmerIntersection {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("preDefKmer-intersect")
val sc = new SparkContext(sparkConf)
import sqlContext.createSchemaRDD
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val bedFile = sc.textFile("s3n://a/b/c",40)
val hgfasta = sc.textFile("hdfs://a/b/c",40)
val hgPair = hgfasta.map(_.split (",")).map(a=> (a(0),
a(1).trim().toInt))
val filtered = hgPair.filter(kv => kv._2 == 1)
val bedPair = bedFile.map(_.split (",")).map(a=> (a(0),
a(1).trim().toInt))
val joinRDD = bedPair.join(filtered)
val ty = joinRDD.map{case(word, (file1Counts, file2Counts))
=> KmerIntesect(word, file1Counts,"xyz")}
ty.registerTempTable("KmerIntesect")
ty.saveAsParquetFile("hdfs://x/y/z/kmerIntersect.parquet")
}
}