Re: Executor metrics in spark application
I'm also pretty interested how to create custom Sinks in Spark. I'm using it with Ganglia and the normal metrics from JVM source do show up. I tried to create my own metric based on Issac's code, but does not show up in Ganglia. Does anyone know where is the problem? Here's the code snippet: class AccumulatorSource(accumulator: Accumulator[Long], name: String) extends Source { val sourceName = accumulator.metrics val metricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name(accumulator, name), new Gauge[Long] { override def getValue: Long = { return accumulator.value; }}); } and then in the main: val longAccumulator = sc.accumulator[Long](0); val accumulatorMetrics = new AccumulatorSource(longAccumulator , counters.accumulator); SparkEnv.get.metricsSystem.registerSource(accumulatorMetrics); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10385.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Executor metrics in spark application
I meant custom Sources, sorry. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10386.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Executor metrics in spark application
Hi Jerry, I know that way of registering a metrics, but it seems defeat the whole purpose. I'd like to define a source that is set within the application, for example number of parsed messages. If I register it in the metrics.properties, how can I obtain the instance? (or instances?) How can I set the property? Is there a way to read an accumulator values from a Source? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10397.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Executor metrics in spark application
As far as I understand even if I could register the custom source, there is no way to have a cluster-wide variable to pass to it, i.e. the accumulator can be modified by tasks, but only the driver can read it and the broadcast value is constant. So it seems this custom metrics/sinks fuctionality is not really thought out by the developers. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-metrics-in-spark-application-tp188p10464.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Strange problem with saveAsTextFile after upgrade Spark 0.9.0-1.0.0
I have the same problem (Spark 0.9.1- 1.0.0 and throws error) and I do call saveAsTextFile. Recompiled for 1.0.0. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:10 failed 4 times, most recent failure: Exception failure in TID 1616 on host r3s1n03.bigdata.emea.nsn-net.net: java.lang.ClassNotFoundException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1 java.net.URLClassLoader$1.run(URLClassLoader.java:366) java.net.URLClassLoader$1.run(URLClassLoader.java:355) java.security.AccessController.doPrivileged(Native Method) java.net.URLClassLoader.findClass(URLClassLoader.java:354) java.lang.ClassLoader.loadClass(ClassLoader.java:423) java.lang.ClassLoader.loadClass(ClassLoader.java:356) java.lang.Class.forName0(Native Method) java.lang.Class.forName(Class.java:264) org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:60) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1593) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1514) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1750) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1964) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1888) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61) org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:141) java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1810) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) java.lang.Thread.run(Thread.java:722) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-problem-with-saveAsTextFile-after-upgrade-Spark-0-9-0-1-0-0-tp6832p7121.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark block manager registration extreme slow
Hi, My Spark installations (both 0.9.1 and 1.0.0) starts up extremely slow when starting a simple Spark Streaming job. I have to wait 6 (!) minutes at INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager stage and another 4 (!) minutes at INFO util.MetadataCleaner: Ran metadata cleaner for DAG_SCHEDULER This is a sum of 10 minute wait to start a job which is really unacceptable... Can someone help me out where to look for a solution? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-block-manager-registration-extreme-slow-tp6797.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Kyro deserialisation error
I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: J Serialization trace: id (org.apache.spark.storage.GetBlock) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kyro-deserialisation-error-tp6798.html Sent from the Apache Spark User List mailing list archive at Nabble.com.