The error stack is throwing from your code:

Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
        at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
        at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)

I think you should debug the code yourself, it may not be the problem of
Spark.

On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <mekal.zh...@gmail.com> wrote:

> Hi,
>
> I have a Spark Streaming job written in Scala and is running well on local
> and client mode, but when I submit it on cluster mode, the driver reported
> an error shown as below.
> Is there anyone know what is wrong here?
> pls help me!
>
> the Job CODE is after
>
> 16/07/14 17:28:21 DEBUG ByteBufUtil:
> -Dio.netty.threadLocalDirectBufferSize: 65536
> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
> 0:0:0:0:0:0:0:1%lo)
> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
> :43492
> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
> port 43492.
> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
> Worker@172.20.130.98:23933
> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to
> /172.20.130.98:23933
> Exception in thread "main" java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>         at
> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>         ... 6 more
>
> ==================
> Job CODE:
>
> object LogAggregator {
>
>   val batchDuration = Seconds(5)
>
>   def main(args:Array[String]) {
>
>     val usage =
>       """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads> 
> <logFormat> <logSeparator> <batchDuration> <destType> <destPath>
>         |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field 
> must have both name and role
>         |  logFormat.role: can be key|avg|enum|sum|ignore
>       """.stripMargin
>
>     if (args.length < 9) {
>       System.err.println(usage)
>       System.exit(1)
>     }
>
>     val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, 
> batchDuration, destType, destPath) = args
>
>     println("Start streaming calculation...")
>
>     val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>     val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>
>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>
>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, 
> topicMap).map(_._2)
>
>     val logFields = logFormat.split(",").map(field => {
>       val fld = field.split(":")
>       if (fld.size != 2) {
>         System.err.println("Wrong parameters for logFormat!\n")
>         System.err.println(usage)
>         System.exit(1)
>       }
>       // TODO: ensure the field has both 'name' and 'role'
>       new LogField(fld(0), fld(1))
>     })
>
>     val keyFields = logFields.filter(logFieldName => {
>       logFieldName.role == "key"
>     })
>     val keys = keyFields.map(key => {
>       key.name
>     })
>
>     val logsByKey = lines.map(line => {
>       val log = new Log(logFields, line, logSeparator)
>       log.toMap
>     }).filter(log => log.nonEmpty).map(log => {
>       val keys = keyFields.map(logField => {
>         log(logField.name).value
>       })
>
>       val key = keys.reduce((key1, key2) => {
>         key1.asInstanceOf[String] + key2.asInstanceOf[String]
>       })
>
>       val fullLog = log + ("count" -> new LogSegment("sum", 1))
>
>       (key, fullLog)
>     })
>
>
>     val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>
>       log_a.map(logField => {
>         val logFieldName = logField._1
>         val logSegment_a = logField._2
>         val logSegment_b = log_b(logFieldName)
>
>         val segValue = logSegment_a.role match {
>           case "avg" => {
>             logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>           }
>           case "sum" => {
>             logSegment_a.value.toString.toInt + 
> logSegment_b.value.toString.toInt
>           }
>           case "enum" => {
>             val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
>             val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
>             list_a ++ list_b
>           }
>           case _ => logSegment_a.value
>         }
>         (logFieldName, new LogSegment(logSegment_a.role, segValue))
>       })
>     }).map(logRecord => {
>       val log = logRecord._2
>       val count = log("count").value.toString.toInt
>
>
>       val logContent = log.map(logField => {
>         val logFieldName = logField._1
>         val logSegment = logField._2
>         val fieldValue = logSegment.role match {
>           case "avg" => {
>             logSegment.value.toString.toInt / count
>           }
>           case "enum" => {
>             val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
>             val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1, 
> e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
>             JSONObject(enumJson)
>           }
>           case _ => logSegment.value
>         }
>         (logFieldName, fieldValue)
>       })
>
>       logContent + ("count" -> count)
>     })
>
>     if (destType == "hbase") {
>
>       val hbaseQuorum = "localhost"
>       val hbaseClientPort = "2181"
>       val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, 
> keys.toList, "tb_", true)
>
>       val jobConf = hbaseStore.jobConf()
>
>       aggResults.foreachRDD((rdd, time) => {
>         rdd.map(record => {
>           val logPut = hbaseStore.convert(record, time)
>           (new ImmutableBytesWritable, logPut)
>         }).saveAsHadoopDataset(jobConf)
>       })
>     } else if (destType == "file") {
>       aggResults.foreachRDD((rdd, time) => {
>         rdd.foreach(record => {
>           val res = record + ("timestamp" -> time.milliseconds)
>           io.File(destPath).appendAll(res.toString() + "\n")
>         })
>       })
>     }
>
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>
> --
> Mekal Zheng
> Sent with Airmail
>

Reply via email to