Hi,

I have a Spark Streaming job written in Scala and is running well on local
and client mode, but when I submit it on cluster mode, the driver reported
an error shown as below.
Is there anyone know what is wrong here?
pls help me!

the Job CODE is after

16/07/14 17:28:21 DEBUG ByteBufUtil:
-Dio.netty.threadLocalDirectBufferSize: 65536
16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
0:0:0:0:0:0:0:1%lo)
16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
:43492
16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on port
43492.
16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
Worker@172.20.130.98:23933
16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection to /
172.20.130.98:23933
Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
[Ljava.lang.String;)
        at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
        at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
        ... 6 more

==================
Job CODE:

object LogAggregator {

  val batchDuration = Seconds(5)

  def main(args:Array[String]) {

    val usage =
      """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads>
<logFormat> <logSeparator> <batchDuration> <destType> <destPath>
        |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each
field must have both name and role
        |  logFormat.role: can be key|avg|enum|sum|ignore
      """.stripMargin

    if (args.length < 9) {
      System.err.println(usage)
      System.exit(1)
    }

    val Array(zkQuorum, group, topics, numThreads, logFormat,
logSeparator, batchDuration, destType, destPath) = args

    println("Start streaming calculation...")

    val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
    val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

    val logFields = logFormat.split(",").map(field => {
      val fld = field.split(":")
      if (fld.size != 2) {
        System.err.println("Wrong parameters for logFormat!\n")
        System.err.println(usage)
        System.exit(1)
      }
      // TODO: ensure the field has both 'name' and 'role'
      new LogField(fld(0), fld(1))
    })

    val keyFields = logFields.filter(logFieldName => {
      logFieldName.role == "key"
    })
    val keys = keyFields.map(key => {
      key.name
    })

    val logsByKey = lines.map(line => {
      val log = new Log(logFields, line, logSeparator)
      log.toMap
    }).filter(log => log.nonEmpty).map(log => {
      val keys = keyFields.map(logField => {
        log(logField.name).value
      })

      val key = keys.reduce((key1, key2) => {
        key1.asInstanceOf[String] + key2.asInstanceOf[String]
      })

      val fullLog = log + ("count" -> new LogSegment("sum", 1))

      (key, fullLog)
    })


    val aggResults = logsByKey.reduceByKey((log_a, log_b) => {

      log_a.map(logField => {
        val logFieldName = logField._1
        val logSegment_a = logField._2
        val logSegment_b = log_b(logFieldName)

        val segValue = logSegment_a.role match {
          case "avg" => {
            logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
          }
          case "sum" => {
            logSegment_a.value.toString.toInt +
logSegment_b.value.toString.toInt
          }
          case "enum" => {
            val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
            val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
            list_a ++ list_b
          }
          case _ => logSegment_a.value
        }
        (logFieldName, new LogSegment(logSegment_a.role, segValue))
      })
    }).map(logRecord => {
      val log = logRecord._2
      val count = log("count").value.toString.toInt


      val logContent = log.map(logField => {
        val logFieldName = logField._1
        val logSegment = logField._2
        val fieldValue = logSegment.role match {
          case "avg" => {
            logSegment.value.toString.toInt / count
          }
          case "enum" => {
            val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
            val enumJson = enumList.groupBy(_._1).map(el =>
el._2.reduce((e1, e2) => (e1._1, e1._2.toString.toInt +
e2._2.toString.toInt)))
            JSONObject(enumJson)
          }
          case _ => logSegment.value
        }
        (logFieldName, fieldValue)
      })

      logContent + ("count" -> count)
    })

    if (destType == "hbase") {

      val hbaseQuorum = "localhost"
      val hbaseClientPort = "2181"
      val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort,
keys.toList, "tb_", true)

      val jobConf = hbaseStore.jobConf()

      aggResults.foreachRDD((rdd, time) => {
        rdd.map(record => {
          val logPut = hbaseStore.convert(record, time)
          (new ImmutableBytesWritable, logPut)
        }).saveAsHadoopDataset(jobConf)
      })
    } else if (destType == "file") {
      aggResults.foreachRDD((rdd, time) => {
        rdd.foreach(record => {
          val res = record + ("timestamp" -> time.milliseconds)
          io.File(destPath).appendAll(res.toString() + "\n")
        })
      })
    }

    ssc.start()
    ssc.awaitTermination()
  }


-- 
Mekal Zheng
Sent with Airmail

Reply via email to