The error stack is throwing from your code: Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class [Ljava.lang.String;) at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29) at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
I think you should debug the code yourself, it may not be the problem of Spark. On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <mekal.zh...@gmail.com> wrote: > Hi, > > I have a Spark Streaming job written in Scala and is running well on local > and client mode, but when I submit it on cluster mode, the driver reported > an error shown as below. > Is there anyone know what is wrong here? > pls help me! > > the Job CODE is after > > 16/07/14 17:28:21 DEBUG ByteBufUtil: > -Dio.netty.threadLocalDirectBufferSize: 65536 > 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo, > 0:0:0:0:0:0:0:1%lo) > 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768 > 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port > :43492 > 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on > port 43492. > 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark:// > Worker@172.20.130.98:23933 > 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to > /172.20.130.98:23933 > Exception in thread "main" java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58) > at > org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala) > Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class > [Ljava.lang.String;) > at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29) > at com.jd.deeplog.LogAggregator.main(LogAggregator.scala) > ... 6 more > > ================== > Job CODE: > > object LogAggregator { > > val batchDuration = Seconds(5) > > def main(args:Array[String]) { > > val usage = > """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> > <logFormat> <logSeparator> <batchDuration> <destType> <destPath> > | logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field > must have both name and role > | logFormat.role: can be key|avg|enum|sum|ignore > """.stripMargin > > if (args.length < 9) { > System.err.println(usage) > System.exit(1) > } > > val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, > batchDuration, destType, destPath) = args > > println("Start streaming calculation...") > > val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator") > val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt)) > > val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap > > val lines = KafkaUtils.createStream(ssc, zkQuorum, group, > topicMap).map(_._2) > > val logFields = logFormat.split(",").map(field => { > val fld = field.split(":") > if (fld.size != 2) { > System.err.println("Wrong parameters for logFormat!\n") > System.err.println(usage) > System.exit(1) > } > // TODO: ensure the field has both 'name' and 'role' > new LogField(fld(0), fld(1)) > }) > > val keyFields = logFields.filter(logFieldName => { > logFieldName.role == "key" > }) > val keys = keyFields.map(key => { > key.name > }) > > val logsByKey = lines.map(line => { > val log = new Log(logFields, line, logSeparator) > log.toMap > }).filter(log => log.nonEmpty).map(log => { > val keys = keyFields.map(logField => { > log(logField.name).value > }) > > val key = keys.reduce((key1, key2) => { > key1.asInstanceOf[String] + key2.asInstanceOf[String] > }) > > val fullLog = log + ("count" -> new LogSegment("sum", 1)) > > (key, fullLog) > }) > > > val aggResults = logsByKey.reduceByKey((log_a, log_b) => { > > log_a.map(logField => { > val logFieldName = logField._1 > val logSegment_a = logField._2 > val logSegment_b = log_b(logFieldName) > > val segValue = logSegment_a.role match { > case "avg" => { > logSegment_a.value.toString.toInt + > logSegment_b.value.toString.toInt > } > case "sum" => { > logSegment_a.value.toString.toInt + > logSegment_b.value.toString.toInt > } > case "enum" => { > val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]] > val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]] > list_a ++ list_b > } > case _ => logSegment_a.value > } > (logFieldName, new LogSegment(logSegment_a.role, segValue)) > }) > }).map(logRecord => { > val log = logRecord._2 > val count = log("count").value.toString.toInt > > > val logContent = log.map(logField => { > val logFieldName = logField._1 > val logSegment = logField._2 > val fieldValue = logSegment.role match { > case "avg" => { > logSegment.value.toString.toInt / count > } > case "enum" => { > val enumList = logSegment.value.asInstanceOf[List[(String, Int)]] > val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, > e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt))) > JSONObject(enumJson) > } > case _ => logSegment.value > } > (logFieldName, fieldValue) > }) > > logContent + ("count" -> count) > }) > > if (destType == "hbase") { > > val hbaseQuorum = "localhost" > val hbaseClientPort = "2181" > val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, > keys.toList, "tb_", true) > > val jobConf = hbaseStore.jobConf() > > aggResults.foreachRDD((rdd, time) => { > rdd.map(record => { > val logPut = hbaseStore.convert(record, time) > (new ImmutableBytesWritable, logPut) > }).saveAsHadoopDataset(jobConf) > }) > } else if (destType == "file") { > aggResults.foreachRDD((rdd, time) => { > rdd.foreach(record => { > val res = record + ("timestamp" -> time.milliseconds) > io.File(destPath).appendAll(res.toString() + "\n") > }) > }) > } > > ssc.start() > ssc.awaitTermination() > } > > > -- > Mekal Zheng > Sent with Airmail >