Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread alberskib
Hey all,

When my streaming application is restarting from failure (from checkpoint) I
am receiving strange error:

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter.

Instance of B class is created on driver side (with proper config passed as
constructor arg) and broadcasted to the executors in order to ensure that on
each worker there will be only single instance. Everything is going well up
to place where I am getting value of broadcasted field and executing
function on it i.e.
broadcastedValue.value.send(...)

Below you can find definition of MyClassReporter (with trait):

trait Reporter{
  def send(name: String, value: String, timestamp: Long) : Unit
  def flush() : Unit
}

class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
with Serializable {

  val prefix = s"${config.senderConfig.prefix}.$flow"

  var counter = 0

  @transient
  private lazy val sender : GraphiteSender = initialize()

  @transient
  private lazy val threadPool =
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

  private def initialize() = {
  val sender = new Sender(
new InetSocketAddress(config.senderConfig.hostname,
config.senderConfig.port)
  )
  sys.addShutdownHook{
sender.close()
  }
  sender
  }

  override def send(name: String, value: String, timestamp: Long) : Unit = {
threadPool.submit(new Runnable {
  override def run(): Unit = {
try {
  counter += 1
  if (!sender.isConnected)
sender.connect()
  sender.send(s"$prefix.$name", value, timestamp)
  if (counter % graphiteConfig.batchSize == 0)
sender.flush()
}catch {
  case NonFatal(e) => {
println(s"Problem with sending metric to graphite $prefix.$name:
$value at $timestamp: ${e.getMessage}", e)
Try{sender.close()}.recover{
  case NonFatal(e) => println(s"Error closing graphite
${e.getMessage}", e)
}
  }
}
  }
})
  }

Do you have any idea how I can solve this issue? Using broadcasted variable
helps me keeping single socket open to the service on executor.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ensuring eager evaluation inside mapPartitions

2015-10-16 Thread alberskib
Hi all,

I am wondering whether there is way to ensure that two consecutive maps
inside mapPartition will not be chained together. 

To illustrate my question I prepared short example:

rdd.mapPartitions(it => {
it.map(x => foo(x)).map(y => y.getResult)
}

I would like to ensure that foo method will be applied to all records (from
partition) and only after that method getResult invoked on each record. It
could be beneficial in situation that foo method is some kind of time
consuming IO operation i.e. request to external service for data (data that
couldn't be prefetched).

I know that converting iterator into list will do the job but maybe there is
more clever way for doing it.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-eager-evaluation-inside-mapPartitions-tp25085.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issue with the class generated from avro schema

2015-10-09 Thread alberskib
Hi all, 

I have piece of code written in spark that loads data from HDFS into java
classes generated from avro idl. On RDD created in that way I am executing
simple operation which results depends on fact whether I cache RDD before it
or not i.e if I run code below

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
20
program will print 20, on the other hand executing next code

val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
1
result in 1 printed to stdout.

When I inspect values of the fields after reading cached data it seems

I am pretty sure that root cause of described problem is issue with
serialization of classes generated from avro idl, but I do not know how to
resolve it. I tried to use Kryo, registering generated class (Data),
registering different serializers from chill_avro for given class
(SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none of
those ideas helps me.

I post exactly the same question on stackoverflow but I did not receive any
repsponse.  link
<http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema>
  

What is more I created minimal working example, thanks to which it will be
easy to reproduce problem.
link <https://github.com/alberskib/spark-avro-serialization-issue>  

How I can solve this problem?


Thanks,
Bartek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org