Hello all I've tried to add some task metrics in org.apache.spark.executor.ShuffleReadMetrics.scala in Spark 2.0.2, following the format of other existing metrics, but when submitting applications, I got these errors:
ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor at org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:158) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:231) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:458) ... 16/12/21 16:16:42 ERROR TaskSchedulerImpl: Resource offer failed, task set TaskSet_0 was not serializable Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: java.lang.UnsupportedOperationException: Accumulator must be registered before send to executor at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) ... 16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerBlockManagerAdded(1482308202401,BlockManagerId(7, 172.18.11.3, 42715),12409896960) 16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerBlockManagerAdded(1482308202445,BlockManagerId(5, 172.18.11.121, 41654),12409896960) It seems like the Accumulator of task metrics has not been registered before being used, but I also added the new metrics in the nameToAccums map in TaskMetrics.scala: private[spark] lazy val nameToAccums = LinkedHashMap( ... // add by txh shuffleRead.SHUFFLE_READ_TIME -> shuffleReadMetrics._shuffleReadTime, shuffleRead.SHUFFLE_MERGE_TIME -> shuffleReadMetrics._shuffleMergeTime, shuffleRead.SHUFFLE_MERGE_MEMORY -> shuffleReadMetrics._shuffleMergeMemory, shuffleRead.SHUFFLE_USE_MEMORY -> shuffleReadMetrics._shuffleUseMemory, ... ) What else should I add to make these new metrics be registered? Thanks very much ShuffleReadMetrics.scala <http://apache-spark-developers-list.1001551.n3.nabble.com/file/n20330/ShuffleReadMetrics.scala> -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Got-an-Accumulator-error-after-adding-some-task-metrics-in-Spark-2-0-2-tp20330.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: dev-unsubscr...@spark.apache.org