Hi, I have a Spark Streaming job written in Scala and is running well on local and client mode, but when I submit it on cluster mode, the driver reported an error shown as below. Is there anyone know what is wrong here? pls help me!
the Job CODE is after 16/07/14 17:28:21 DEBUG ByteBufUtil: -Dio.netty.threadLocalDirectBufferSize: 65536 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo, 0:0:0:0:0:0:0:1%lo) 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port :43492 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on port 43492. 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark:// Worker@172.20.130.98:23933 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to / 172.20.130.98:23933 Exception in thread "main" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class [Ljava.lang.String;) at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29) at com.jd.deeplog.LogAggregator.main(LogAggregator.scala) ... 6 more ================== Job CODE: object LogAggregator { val batchDuration = Seconds(5) def main(args:Array[String]) { val usage = """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> <logFormat> <logSeparator> <batchDuration> <destType> <destPath> | logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have both name and role | logFormat.role: can be key|avg|enum|sum|ignore """.stripMargin if (args.length < 9) { System.err.println(usage) System.exit(1) } val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration, destType, destPath) = args println("Start streaming calculation...") val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator") val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt)) val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val logFields = logFormat.split(",").map(field => { val fld = field.split(":") if (fld.size != 2) { System.err.println("Wrong parameters for logFormat!\n") System.err.println(usage) System.exit(1) } // TODO: ensure the field has both 'name' and 'role' new LogField(fld(0), fld(1)) }) val keyFields = logFields.filter(logFieldName => { logFieldName.role == "key" }) val keys = keyFields.map(key => { key.name }) val logsByKey = lines.map(line => { val log = new Log(logFields, line, logSeparator) log.toMap }).filter(log => log.nonEmpty).map(log => { val keys = keyFields.map(logField => { log(logField.name).value }) val key = keys.reduce((key1, key2) => { key1.asInstanceOf[String] + key2.asInstanceOf[String] }) val fullLog = log + ("count" -> new LogSegment("sum", 1)) (key, fullLog) }) val aggResults = logsByKey.reduceByKey((log_a, log_b) => { log_a.map(logField => { val logFieldName = logField._1 val logSegment_a = logField._2 val logSegment_b = log_b(logFieldName) val segValue = logSegment_a.role match { case "avg" => { logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt } case "sum" => { logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt } case "enum" => { val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]] val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]] list_a ++ list_b } case _ => logSegment_a.value } (logFieldName, new LogSegment(logSegment_a.role, segValue)) }) }).map(logRecord => { val log = logRecord._2 val count = log("count").value.toString.toInt val logContent = log.map(logField => { val logFieldName = logField._1 val logSegment = logField._2 val fieldValue = logSegment.role match { case "avg" => { logSegment.value.toString.toInt / count } case "enum" => { val enumList = logSegment.value.asInstanceOf[List[(String, Int)]] val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt))) JSONObject(enumJson) } case _ => logSegment.value } (logFieldName, fieldValue) }) logContent + ("count" -> count) }) if (destType == "hbase") { val hbaseQuorum = "localhost" val hbaseClientPort = "2181" val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList, "tb_", true) val jobConf = hbaseStore.jobConf() aggResults.foreachRDD((rdd, time) => { rdd.map(record => { val logPut = hbaseStore.convert(record, time) (new ImmutableBytesWritable, logPut) }).saveAsHadoopDataset(jobConf) }) } else if (destType == "file") { aggResults.foreachRDD((rdd, time) => { rdd.foreach(record => { val res = record + ("timestamp" -> time.milliseconds) io.File(destPath).appendAll(res.toString() + "\n") }) }) } ssc.start() ssc.awaitTermination() } -- Mekal Zheng Sent with Airmail