       I want to extract some info from kafka useing sparkstream,my code like :
    val keyword = ""
    val system = "dmp"
    val datetime_idx = 0
    val datetime_length = 23
    val logLevelBeginIdx = datetime_length + 2 - 1
    val logLevelMaxLenght = 5
    val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
      val assembly = record.topic()
      val value = record.value
      val datatime = value.substring(datetime_idx, datetime_length - 1)
      val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + 
logLevelMaxLenght - 1)
    I will get error :
    Caused by: java.io.NotSerializableException: 
Serialization stack:
 - object not serializable (class: org.apache.spark.streaming.StreamingContext, 
value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 - field (class: $iw, name: streamingContext, type: class 
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)
  if I change the parameter to constant I will not got error  :
  val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
      val assembly = record.topic()
      val value = record.value
      val datatime = value.substring(0, 22)
      val level = value.substring(24, 27)

how can I pass parameter to the map function.



Reply via email to