Hey all,

When my streaming application is restarting from failure (from checkpoint) I
am receiving strange error:

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter.

Instance of B class is created on driver side (with proper config passed as
constructor arg) and broadcasted to the executors in order to ensure that on
each worker there will be only single instance. Everything is going well up
to place where I am getting value of broadcasted field and executing
function on it i.e.
broadcastedValue.value.send(...)

Below you can find definition of MyClassReporter (with trait):

trait Reporter{
  def send(name: String, value: String, timestamp: Long) : Unit
  def flush() : Unit
}

class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
with Serializable {

  val prefix = s"${config.senderConfig.prefix}.$flow"

  var counter = 0

  @transient
  private lazy val sender : GraphiteSender = initialize()

  @transient
  private lazy val threadPool =
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

  private def initialize() = {
      val sender = new Sender(
        new InetSocketAddress(config.senderConfig.hostname,
config.senderConfig.port)
      )
      sys.addShutdownHook{
        sender.close()
      }
      sender
  }

  override def send(name: String, value: String, timestamp: Long) : Unit = {
    threadPool.submit(new Runnable {
      override def run(): Unit = {
        try {
          counter += 1
          if (!sender.isConnected)
            sender.connect()
          sender.send(s"$prefix.$name", value, timestamp)
          if (counter % graphiteConfig.batchSize == 0)
            sender.flush()
        }catch {
          case NonFatal(e) => {
            println(s"Problem with sending metric to graphite $prefix.$name:
$value at $timestamp: ${e.getMessage}", e)
            Try{sender.close()}.recover{
              case NonFatal(e) => println(s"Error closing graphite
${e.getMessage}", e)
            }
          }
        }
      }
    })
  }

Do you have any idea how I can solve this issue? Using broadcasted variable
helps me keeping single socket open to the service on executor.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to