This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-toree.git


The following commit(s) were added to refs/heads/master by this push:
     new 145599f5 Add listener plugin for Jupyter Spark monitor (#229)
145599f5 is described below

commit 145599f53d3bfe25edef38db394befcf71ce2f44
Author: vincent <[email protected]>
AuthorDate: Thu Oct 9 16:39:25 2025 -0700

    Add listener plugin for Jupyter Spark monitor (#229)
    
    
    ---------
    
    Co-authored-by: Vincent Ye <[email protected]>
    Co-authored-by: Luciano Resende <[email protected]>
---
 Makefile                                           |  14 +-
 README.md                                          |  53 ++
 build.sbt                                          |   9 +-
 plugins/build.sbt                                  |   2 +-
 project/Dependencies.scala                         |   2 +
 {plugins => spark-monitor-plugin}/build.sbt        |  26 +-
 .../sparkmonitor/JupyterSparkMonitorListener.scala | 781 +++++++++++++++++++++
 .../apache/toree/plugins/sparkmonitor/README.md    | 227 ++++++
 .../plugins/sparkmonitor/SparkMonitorPlugin.scala  | 276 ++++++++
 9 files changed, 1382 insertions(+), 8 deletions(-)

diff --git a/Makefile b/Makefile
index a2bc3117..473c791d 100644
--- a/Makefile
+++ b/Makefile
@@ -103,6 +103,8 @@ dev-binder: .binder-image
                --workdir /home/main/notebooks $(BINDER_IMAGE) \
                /home/main/start-notebook.sh --ip=0.0.0.0
 
+SPARK_MONITOR_JAR:=toree-spark-monitor-plugin-assembly-$(VERSION)$(SNAPSHOT).jar
+
 target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR): VM_WORKDIR=/src/toree-kernel
 target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR): ${shell find ./*/src/main/**/*}
 target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR): ${shell find ./*/build.sbt}
@@ -110,7 +112,14 @@ target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR): ${shell 
find ./project/*.scala} $
 target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR): dist/toree-legal 
project/build.properties build.sbt
        $(call RUN,$(ENV_OPTS) sbt root/assembly)
 
-build: target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR)
+spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR): 
VM_WORKDIR=/src/toree-kernel
+spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR): 
${shell find ./spark-monitor-plugin/src/main/**/*}
+spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR): 
spark-monitor-plugin/build.sbt
+spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR): 
${shell find ./project/*.scala} ${shell find ./project/*.sbt}
+spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR): 
project/build.properties build.sbt
+       $(call RUN,$(ENV_OPTS) sbt sparkMonitorPlugin/assembly)
+
+build: target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR) 
spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR)
 
 test: VM_WORKDIR=/src/toree-kernel
 test:
@@ -119,9 +128,10 @@ test:
 sbt-%:
        $(call RUN,$(ENV_OPTS) sbt $(subst sbt-,,$@) )
 
-dist/toree/lib: target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR)
+dist/toree/lib: target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR) 
spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR)
        @mkdir -p dist/toree/lib
        @cp target/scala-$(SCALA_VERSION)/$(ASSEMBLY_JAR) dist/toree/lib/.
+       @cp 
spark-monitor-plugin/target/scala-$(SCALA_VERSION)/$(SPARK_MONITOR_JAR) 
dist/toree/lib/.
 
 dist/toree/bin: ${shell find ./etc/bin/*}
        @mkdir -p dist/toree/bin
diff --git a/README.md b/README.md
index 0ab068a3..a02f4b45 100644
--- a/README.md
+++ b/README.md
@@ -80,6 +80,59 @@ This results in 2 packages.
 
 NOTE: `make release` uses `docker`. Please refer to `docker` installation 
instructions for your system.
 
+## Building Individual Components
+
+### Main Toree Assembly
+To build just the main Toree assembly jar (without spark-monitor-plugin):
+```
+sbt assembly
+```
+This creates: `target/scala-2.12/toree-assembly-<VERSION>.jar`
+
+### Spark Monitor Plugin
+To build the spark-monitor-plugin as a separate jar:
+```
+sbt sparkMonitorPlugin/assembly
+```
+This creates: 
`spark-monitor-plugin/target/scala-2.12/spark-monitor-plugin-<VERSION>.jar`
+
+### Build All Components
+To compile all projects including both the main assembly and 
spark-monitor-plugin:
+```
+sbt compile
+```
+
+**Note**: The spark-monitor-plugin is now built as a separate jar and is not 
included in the main Toree assembly.
+
+## Using the Spark Monitor Plugin
+
+To enable the Spark Monitor Plugin in your Toree application, you need to 
specify the path to the plugin JAR when starting Toree:
+
+### Option 1: Command Line Parameter
+```bash
+# Start Toree with spark-monitor-plugin enabled
+java -jar target/scala-2.12/toree-assembly-<VERSION>.jar --magic-url 
file:///path/to/spark-monitor-plugin/target/scala-2.12/spark-monitor-plugin-<VERSION>.jar
 [other-options]
+```
+
+### Option 2: Jupyter Kernel Installation
+When installing Toree as a Jupyter kernel, you can specify the plugin:
+```bash
+jupyter toree install --spark_home=<YOUR_SPARK_PATH> 
--kernel_name=toree_with_monitor --toree_opts="--magic-url 
file:///path/to/spark-monitor-plugin-<VERSION>.jar"
+```
+
+### Option 3: Configuration File
+You can also specify the plugin in a configuration file and use the 
`--profile` option:
+```json
+{
+  "magic_urls": ["file:///path/to/spark-monitor-plugin-<VERSION>.jar"]
+}
+```
+Then start with: `java -jar toree-assembly.jar --profile config.json`
+
+**Important**:
+- Make sure to use the absolute path to the spark-monitor-plugin JAR file and 
ensure the JAR is accessible from the location where Toree is running.
+- The JAR file name does not contain "toree" prefix to avoid automatic loading 
as an internal plugin. This allows you to control when the SparkMonitorPlugin 
is enabled via the `--magic-url` parameter.
+
 Run Examples
 ============
 To play with the example notebooks, run
diff --git a/build.sbt b/build.sbt
index 3fb4030b..37fa63b6 100644
--- a/build.sbt
+++ b/build.sbt
@@ -126,7 +126,7 @@ ThisBuild / credentials += Credentials(Path.userHome / 
".ivy2" / ".credentials")
 lazy val root = (project in file("."))
   .settings(name := "toree")
   .aggregate(
-    
macros,protocol,plugins,communication,kernelApi,client,scalaInterpreter,sqlInterpreter,kernel
+    
macros,protocol,plugins,sparkMonitorPlugin,communication,kernelApi,client,scalaInterpreter,sqlInterpreter,kernel
   )
   .dependsOn(
     
macros,protocol,communication,kernelApi,client,scalaInterpreter,sqlInterpreter,kernel
@@ -154,6 +154,13 @@ lazy val plugins = (project in file("plugins"))
   .settings(name := "toree-plugins")
   .dependsOn(macros)
 
+/**
+  * Project representing the SparkMonitor plugin for Toree.
+  */
+lazy val sparkMonitorPlugin = (project in file("spark-monitor-plugin"))
+  .settings(name := "toree-spark-monitor-plugin")
+  .dependsOn(macros, protocol, plugins, kernel, kernelApi)
+
 /**
   * Project representing forms of communication used as input/output for the
   * client/kernel.
diff --git a/plugins/build.sbt b/plugins/build.sbt
index 88f9a494..d61c0a35 100644
--- a/plugins/build.sbt
+++ b/plugins/build.sbt
@@ -21,7 +21,7 @@ Test / fork := true
 libraryDependencies ++= Seq(
   Dependencies.scalaReflect.value,
   Dependencies.clapper,
-  Dependencies.slf4jApi
+  Dependencies.slf4jApi,
 )
 
 // Test dependencies
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 08d7bf86..3b8c101a 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -84,4 +84,6 @@ object Dependencies {
     )
   }
 
+  val py4j = "net.sf.py4j" % "py4j" % "0.10.7" % "provided"
+
 }
diff --git a/plugins/build.sbt b/spark-monitor-plugin/build.sbt
similarity index 60%
copy from plugins/build.sbt
copy to spark-monitor-plugin/build.sbt
index 88f9a494..2c7d19f6 100644
--- a/plugins/build.sbt
+++ b/spark-monitor-plugin/build.sbt
@@ -17,12 +17,30 @@
 
 Test / fork := true
 
-// Needed for type inspection
+// Needed for SparkMonitor plugin
+libraryDependencies ++= Dependencies.sparkAll.value
 libraryDependencies ++= Seq(
-  Dependencies.scalaReflect.value,
-  Dependencies.clapper,
-  Dependencies.slf4jApi
+  Dependencies.playJson,
+  Dependencies.py4j
 )
 
 // Test dependencies
 libraryDependencies += Dependencies.scalaCompiler.value % "test"
+
+// Assembly configuration for separate jar
+enablePlugins(AssemblyPlugin)
+
+assembly / assemblyMergeStrategy := {
+  case "module-info.class" => MergeStrategy.discard
+  case PathList("META-INF", "versions", "9", "module-info.class") => 
MergeStrategy.discard
+  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
+  case x =>
+    val oldStrategy = (assembly / assemblyMergeStrategy).value
+    oldStrategy(x)
+}
+
+assembly / assemblyOption ~= {
+  _.withIncludeScala(false)
+}
+
+assembly / test := {}
\ No newline at end of file
diff --git 
a/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/JupyterSparkMonitorListener.scala
 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/JupyterSparkMonitorListener.scala
new file mode 100644
index 00000000..cde7e656
--- /dev/null
+++ 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/JupyterSparkMonitorListener.scala
@@ -0,0 +1,781 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.plugins.sparkmonitor
+
+import org.apache.spark.scheduler._
+import play.api.libs.json._
+import org.apache.spark._
+import org.apache.spark.JobExecutionStatus
+
+import scala.collection.mutable
+import scala.collection.mutable.{ HashMap, HashSet, ListBuffer }
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
+import java.util.{TimerTask,Timer}
+import org.apache.toree.comm.CommWriter
+import org.apache.toree.kernel.protocol.v5.MsgData
+import org.slf4j.LoggerFactory
+
+import scala.util.Try
+
+/**
+ * A SparkListener Implementation that forwards data to a Jupyter Kernel via 
comm
+ *
+ *  - All data is forwarded to a jupyter kernel using comm channel.
+ *  - The listener receives notifications of the spark application's events, 
through the overrided methods.
+ *  - The received data is stored and sent as JSON to the kernel via comm.
+ *  - Overrides methods that correspond to events in a spark Application.
+ *  - The argument for each overrided method contains the received data for 
that event. (See SparkListener docs for more information.)
+ *  - For each application, job, stage, and task there is a 'start' and an 
'end' event. For executors, there are 'added' and 'removed' events
+ *
+ *  @constructor called by the plugin system
+ *  @param commWriter The comm writer to send messages through
+ */
+class JupyterSparkMonitorListener(getCommWriter: () => Option[CommWriter]) 
extends SparkListener {
+
+  val logger = LoggerFactory.getLogger(this.getClass())
+  logger.info("Started JupyterSparkMonitorListener for Jupyter Notebook")
+  
+  var onStageStatusActiveTask: TimerTask = null
+  val sparkTasksQueue: BlockingQueue[String] = new 
LinkedBlockingQueue[String]()
+  val sparkStageActiveTasksMaxMessages: Integer = 250
+  val sparkStageActiveRate: Long = 1000L // 1s
+
+  logger.info("Starting timer task for active stage monitoring")
+  startActiveStageMonitoring()
+
+  /** Send a JSON message via comm channel. */
+  def send(json: JsValue): Unit = {
+    val jsonString = Json.stringify(json)
+    getCommWriter().foreach { writer =>
+      try {
+        // Create MsgData with the JSON content
+        val msgData = MsgData("msgtype" -> "fromscala", "msg" -> jsonString)
+
+        // Send message directly using CommWriter
+        writer.writeMsg(msgData)
+
+      } catch {
+        case exception: Throwable =>
+          logger.error("Exception sending comm message: ", exception)
+          // Fallback: just log the message
+          logger.debug(s"SparkMonitor event: $jsonString")
+      }
+    }
+
+    // If no comm writer, just log the message
+    if (getCommWriter().isEmpty) {
+      logger.debug(s"SparkMonitor event (no comm): $jsonString")
+    }
+  }
+
+  /** Start the active stage monitoring task. */
+  def startActiveStageMonitoring(): Unit = {
+    try {
+      val t = new Timer()
+
+      if (onStageStatusActiveTask == null) {
+        onStageStatusActiveTask = new TimerTask {
+          def run() = {
+            onStageStatusActive()
+          }
+        }
+      }
+      t.schedule(onStageStatusActiveTask, sparkStageActiveRate, 
sparkStageActiveRate)
+    } catch {
+      case exception: Throwable => logger.error("Exception creating timer 
task: ", exception)
+    }
+  }
+
+  /** Stop the active stage monitoring task. */
+  def stopActiveStageMonitoring(): Unit = {
+    logger.info("Stopping active stage monitoring")
+    if (onStageStatusActiveTask != null) {
+      onStageStatusActiveTask.cancel()
+    }
+  }
+
+  type JobId = Int
+  type JobGroupId = String
+  type StageId = Int
+  type StageAttemptId = Int
+
+  //Application
+  @volatile var startTime = -1L
+  @volatile var endTime = -1L
+  var appId: String = ""
+
+  //Jobs
+  val activeJobs = new HashMap[JobId, UIData.JobUIData]
+  val completedJobs = ListBuffer[UIData.JobUIData]()
+  val failedJobs = ListBuffer[UIData.JobUIData]()
+  val jobIdToData = new HashMap[JobId, UIData.JobUIData]
+  val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
+
+  // Stages:
+  val pendingStages = new HashMap[StageId, StageInfo]
+  val activeStages = new HashMap[StageId, StageInfo]
+  val completedStages = ListBuffer[StageInfo]()
+  val skippedStages = ListBuffer[StageInfo]()
+  val failedStages = ListBuffer[StageInfo]()
+  val stageIdToData = new HashMap[(StageId, StageAttemptId), 
UIData.StageUIData]
+  val stageIdToInfo = new HashMap[StageId, StageInfo]
+  val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
+
+  var numCompletedStages = 0
+  var numFailedStages = 0
+  var numCompletedJobs = 0
+  var numFailedJobs = 0
+
+  val retainedStages = 1000
+  val retainedJobs = 1000
+  val retainedTasks = 100000
+
+  @volatile
+  var totalNumActiveTasks = 0
+  val executorCores = new HashMap[String, Int]
+  @volatile var totalCores: Int = 0
+  @volatile var numExecutors: Int = 0
+
+  /**
+   * Called when a spark application starts.
+   *
+   * The application start time and app ID are obtained here.
+   */
+  override def onApplicationStart(appStarted: SparkListenerApplicationStart): 
Unit = {
+    startTime = appStarted.time
+    appId = appStarted.appId.getOrElse("null")
+    logger.info("Application Started: " + appId + "  ...Start Time: " + 
appStarted.time)
+    val json = Json.obj(
+      "msgtype" -> "sparkApplicationStart",
+      "startTime" -> startTime,
+      "appId" -> appId,
+      "appAttemptId" -> appStarted.appAttemptId.getOrElse[String]("null"),
+      "appName" -> appStarted.appName,
+      "sparkUser" -> appStarted.sparkUser
+    )
+
+    send(json)
+  }
+
+  /**
+   * Called when a spark application ends.
+   *
+   * Stops the active stage monitoring task.
+   */
+  override def onApplicationEnd(appEnded: SparkListenerApplicationEnd): Unit = 
{
+    logger.info("Application ending...End Time: " + appEnded.time)
+    endTime = appEnded.time
+    val json = Json.obj(
+      "msgtype" -> "sparkApplicationEnd",
+      "endTime" -> endTime
+    )
+
+    send(json)
+    stopActiveStageMonitoring()
+  }
+
+  /** Converts stageInfo object to a JSON object. */
+  def stageInfoToJSON(stageInfo: StageInfo): JsObject = {
+    val completionTime: Long = stageInfo.completionTime.getOrElse(-1)
+    val submissionTime: Long = stageInfo.submissionTime.getOrElse(-1)
+
+    Json.obj(
+      stageInfo.stageId.toString -> Json.obj(
+        "attemptId" -> stageInfo.attemptNumber(),
+        "name" -> stageInfo.name,
+        "numTasks" -> stageInfo.numTasks,
+        "completionTime" -> completionTime,
+        "submissionTime" -> submissionTime
+      )
+    )
+  }
+
+  /**
+   * Called when a job starts.
+   *
+   * The jobStart object contains the list of planned stages. They are stored 
for tracking skipped stages.
+   * The total number of tasks is also estimated from the list of planned 
stages,
+   */
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = 
synchronized {
+
+    val jobGroup = for (
+      props <- Option(jobStart.properties);
+      group <- Option(props.getProperty("spark.jobGroup.id"))
+    ) yield group
+
+    val jobData: UIData.JobUIData =
+      new UIData.JobUIData(
+        jobId = jobStart.jobId,
+        submissionTime = Option(jobStart.time).filter(_ >= 0),
+        stageIds = jobStart.stageIds,
+        jobGroup = jobGroup,
+        status = JobExecutionStatus.RUNNING)
+    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new 
HashSet[JobId]).add(jobStart.jobId)
+    jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
+
+    // Merge all stage info objects into one JSON object
+    val stageinfojson = jobStart.stageInfos.foldLeft(Json.obj()) { (acc, 
stageInfo) =>
+      acc ++ stageInfoToJSON(stageInfo)
+    }
+
+    jobData.numTasks = {
+      val allStages = jobStart.stageInfos
+      val missingStages = allStages.filter(_.completionTime.isEmpty)
+      missingStages.map(_.numTasks).sum
+    }
+    jobIdToData(jobStart.jobId) = jobData
+    activeJobs(jobStart.jobId) = jobData
+    for (stageId <- jobStart.stageIds) {
+      stageIdToActiveJobIds.getOrElseUpdate(stageId, new 
HashSet[StageId]).add(jobStart.jobId)
+    }
+    // If there's no information for a stage, store the StageInfo received 
from the scheduler
+    // so that we can display stage descriptions for pending stages:
+    for (stageInfo <- jobStart.stageInfos) {
+      stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
+      stageIdToData.getOrElseUpdate((stageInfo.stageId, 
stageInfo.attemptNumber()), new UIData.StageUIData)
+    }
+    val name = jobStart.properties.getProperty("callSite.short", "null")
+    val json = Json.obj(
+      "msgtype" -> "sparkJobStart",
+      "jobGroup" -> jobGroup.getOrElse[String]("null"),
+      "jobId" -> jobStart.jobId,
+      "status" -> "RUNNING",
+      "submissionTime" -> Option(jobStart.time).filter(_ >= 0),
+      "stageIds" -> jobStart.stageIds,
+      "stageInfos" -> stageinfojson,
+      "numTasks" -> jobData.numTasks,
+      "totalCores" -> totalCores,
+      "appId" -> appId,
+      "numExecutors" -> numExecutors,
+      "name" -> name
+    )
+    logger.info("Job Start: " + jobStart.jobId)
+    logger.debug(Json.prettyPrint(json))
+    send(json)
+  }
+
+  /** Called when a job ends. */
+  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
+    val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
+      logger.info("Job completed for unknown job: " + jobEnd.jobId)
+      new UIData.JobUIData(jobId = jobEnd.jobId)
+    }
+    jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
+    var status = "null"
+    jobData.stageIds.foreach(pendingStages.remove)
+    jobEnd.jobResult match {
+      case JobSucceeded =>
+        completedJobs += jobData
+        trimJobsIfNecessary(completedJobs)
+        jobData.status = JobExecutionStatus.SUCCEEDED
+        status = "COMPLETED"
+        numCompletedJobs += 1
+      case _ =>
+        failedJobs += jobData
+        trimJobsIfNecessary(failedJobs)
+        jobData.status = JobExecutionStatus.FAILED
+        numFailedJobs += 1
+        status = "FAILED"
+    }
+    for (stageId <- jobData.stageIds) {
+      stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
+        jobsUsingStage.remove(jobEnd.jobId)
+        if (jobsUsingStage.isEmpty) {
+          stageIdToActiveJobIds.remove(stageId)
+        }
+        stageIdToInfo.get(stageId).foreach { stageInfo =>
+          if (stageInfo.submissionTime.isEmpty) {
+            // if this stage is pending, it won't complete, so mark it as 
"skipped":
+            skippedStages += stageInfo
+            trimStagesIfNecessary(skippedStages)
+            jobData.numSkippedStages += 1
+            jobData.numSkippedTasks += stageInfo.numTasks
+          }
+        }
+      }
+    }
+
+    val json = Json.obj(
+      "msgtype" -> "sparkJobEnd",
+      "jobId" -> jobEnd.jobId,
+      "status" -> status,
+      "completionTime" -> jobData.completionTime
+    )
+
+    logger.info("Job End: " + jobEnd.jobId)
+    logger.debug(Json.prettyPrint(json))
+
+    send(json)
+  }
+
+  /** Called when a stage is completed. */
+  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
Unit = synchronized {
+    val stage = stageCompleted.stageInfo
+    stageIdToInfo(stage.stageId) = stage
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptNumber()), {
+      logger.info("Stage completed for unknown stage " + stage.stageId)
+      new UIData.StageUIData
+    })
+    var status = "UNKNOWN"
+    activeStages.remove(stage.stageId)
+    if (stage.failureReason.isEmpty) {
+      completedStages += stage
+      numCompletedStages += 1
+      trimStagesIfNecessary(completedStages)
+      status = "COMPLETED"
+    } else {
+      failedStages += stage
+      numFailedStages += 1
+      trimStagesIfNecessary(failedStages)
+      status = "FAILED"
+    }
+
+    val jobIds = stageIdToActiveJobIds.get(stage.stageId)
+    for (
+      activeJobsDependentOnStage <- jobIds;
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveStages -= 1
+      if (stage.failureReason.isEmpty) {
+        if (stage.submissionTime.isDefined) {
+          jobData.completedStageIndices.add(stage.stageId)
+        }
+      } else {
+        jobData.numFailedStages += 1
+      }
+    }
+    val completionTime: Long = stage.completionTime.getOrElse(-1)
+    val submissionTime: Long = stage.submissionTime.getOrElse(-1)
+    val json = Json.obj(
+      "msgtype" -> "sparkStageCompleted",
+      "stageId" -> stage.stageId,
+      "stageAttemptId" -> stage.attemptNumber(),
+      "completionTime" -> completionTime,
+      "submissionTime" -> submissionTime,
+      "numTasks" -> stage.numTasks,
+      "numFailedTasks" -> stageData.numFailedTasks,
+      "numCompletedTasks" -> stageData.numCompletedTasks,
+      "status" -> status,
+      "jobIds" -> jobIds
+    )
+
+    logger.info("Stage Completed: " + stage.stageId)
+    logger.debug(Json.prettyPrint(json))
+    send(json)
+  }
+
+  /** Called when a stage is submitted for execution. */
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
Unit = synchronized {
+    val stage = stageSubmitted.stageInfo
+    activeStages(stage.stageId) = stage
+    pendingStages.remove(stage.stageId)
+    stageIdToInfo(stage.stageId) = stage
+    val stageData = stageIdToData.getOrElseUpdate((stage.stageId, 
stage.attemptNumber()), new UIData.StageUIData)
+    stageData.description = Option(stageSubmitted.properties).flatMap {
+      p => Option(p.getProperty("spark.job.description"))
+    }
+
+    for (
+      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveStages += 1
+      // If a stage retries again, it should be removed from 
completedStageIndices set
+      jobData.completedStageIndices.remove(stage.stageId)
+    }
+    val activeJobsDependentOnStage = stageIdToActiveJobIds.get(stage.stageId)
+    val jobIds = activeJobsDependentOnStage
+    val submissionTime: Long = stage.submissionTime.getOrElse(-1)
+    val json = Json.obj(
+      "msgtype" -> "sparkStageSubmitted",
+      "stageId" -> stage.stageId,
+      "stageAttemptId" -> stage.attemptNumber(),
+      "name" -> stage.name,
+      "numTasks" -> stage.numTasks,
+      "parentIds" -> stage.parentIds,
+      "submissionTime" -> submissionTime,
+      "jobIds" -> jobIds
+    )
+    logger.info("Stage Submitted: " + stage.stageId)
+    logger.debug(Json.prettyPrint(json))
+    send(json)
+  }
+
+  /** Called when scheduled stage tasks update was requested */
+  def onStageStatusActive(): Unit = {
+    // Update on status of active stages
+    for ((stageId, stageInfo) <- activeStages) {
+      val stageData = stageIdToData.getOrElseUpdate((stageInfo.stageId, 
stageInfo.attemptNumber()), new UIData.StageUIData)
+      val jobIds = stageIdToActiveJobIds.get(stageInfo.stageId)
+
+      val json = Json.obj(
+        "msgtype" -> "sparkStageActive",
+        "stageId" -> stageInfo.stageId,
+        "stageAttemptId" -> stageInfo.attemptNumber(),
+        "name" -> stageInfo.name,
+        "parentIds" -> stageInfo.parentIds,
+        "numTasks" -> stageInfo.numTasks,
+        "numActiveTasks" -> stageData.numActiveTasks,
+        "numFailedTasks" -> stageData.numFailedTasks,
+        "numCompletedTasks" -> stageData.numCompletedTasks,
+        "jobIds" -> jobIds
+      )
+
+      logger.info("Stage Update: " + stageInfo.stageId)
+      logger.debug(Json.prettyPrint(json))
+      send(json)
+    }
+
+    // Emit sparkStageActiveTasksMaxMessages spark tasks details from queue to 
frontend
+    var count: Integer = 0
+    while (sparkTasksQueue != null && !sparkTasksQueue.isEmpty() && count <= 
sparkStageActiveTasksMaxMessages) {
+      count = count + 1
+      val jsonString = sparkTasksQueue.take()
+      // Already stringified JSON, parse and send
+      Try(Json.parse(jsonString)).foreach(send)
+    }
+
+    if (count > 0) {
+      logger.info("Stage Tasks details updated: " + count)
+    }
+  }
+
+  /** Called when a task is started. */
+  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = 
synchronized {
+    val taskInfo = taskStart.taskInfo
+    if (taskInfo != null) {
+      val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, 
taskStart.stageAttemptId), {
+        logger.info("Task start for unknown stage " + taskStart.stageId)
+        new UIData.StageUIData
+      })
+      stageData.numActiveTasks += 1
+    }
+    var jobjson = Json.obj("jobdata" -> "taskstart")
+    for (
+      activeJobsDependentOnStage <- 
stageIdToActiveJobIds.get(taskStart.stageId);
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveTasks += 1
+      val jobjson = Json.obj(
+        "jobdata" -> Json.obj(
+          "jobId" -> jobData.jobId,
+          "numTasks" -> jobData.numTasks,
+          "numActiveTasks" -> jobData.numActiveTasks,
+          "numCompletedTasks" -> jobData.numCompletedTasks,
+          "numSkippedTasks" -> jobData.numSkippedTasks,
+          "numFailedTasks" -> jobData.numFailedTasks,
+          "reasonToNumKilled" -> jobData.reasonToNumKilled,
+          "numActiveStages" -> jobData.numActiveStages,
+          "numSkippedStages" -> jobData.numSkippedStages,
+          "numFailedStages" -> jobData.numFailedStages
+        )
+      )
+    }
+    val json = Json.obj(
+      "msgtype" -> "sparkTaskStart",
+      "launchTime" -> taskInfo.launchTime,
+      "taskId" -> taskInfo.taskId,
+      "stageId" -> taskStart.stageId,
+      "stageAttemptId" -> taskStart.stageAttemptId,
+      "index" -> taskInfo.index,
+      "attemptNumber" -> taskInfo.attemptNumber,
+      "executorId" -> taskInfo.executorId,
+      "host" -> taskInfo.host,
+      "status" -> taskInfo.status,
+      "speculative" -> taskInfo.speculative
+    )
+
+    logger.info("Task Start: " + taskInfo.taskId)
+    logger.debug(Json.prettyPrint(json))
+
+    // Buffer the message for periodic flushing
+    sparkTasksQueue.put(Json.stringify(json))
+  }
+
+  /** Called when a task is ended. */
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+    val info = taskEnd.taskInfo
+    // If stage attempt id is -1, it means the DAGScheduler had no idea which 
attempt this task
+    // completion event is for. Let's just drop it here. This means we might 
have some speculation
+    // tasks on the web ui that's never marked as complete.
+    var errorMessage: Option[String] = None
+    if (info != null && taskEnd.stageAttemptId != -1) {
+      val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, 
taskEnd.stageAttemptId), {
+        logger.info("Task end for unknown stage " + taskEnd.stageId)
+        new UIData.StageUIData
+      })
+      stageData.numActiveTasks -= 1
+      errorMessage = taskEnd.reason match {
+        case org.apache.spark.Success =>
+          stageData.completedIndices.add(info.index)
+          stageData.numCompletedTasks += 1
+          None
+        case e: ExceptionFailure => // Handle ExceptionFailure because we 
might have accumUpdates
+          stageData.numFailedTasks += 1
+          Some(e.toErrorString)
+        case e: TaskFailedReason => // All other failure cases
+          stageData.numFailedTasks += 1
+          Some(e.toErrorString)
+      }
+
+      for (
+        activeJobsDependentOnStage <- 
stageIdToActiveJobIds.get(taskEnd.stageId);
+        jobId <- activeJobsDependentOnStage;
+        jobData <- jobIdToData.get(jobId)
+      ) {
+        jobData.numActiveTasks -= 1
+        taskEnd.reason match {
+          case Success =>
+            jobData.numCompletedTasks += 1
+          case _ =>
+            jobData.numFailedTasks += 1
+        }
+      }
+    }
+
+    val totalExecutionTime = info.finishTime - info.launchTime
+    def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100
+    var metricsOpt = Option(taskEnd.taskMetrics)
+    val shuffleReadTime = 
metricsOpt.map(_.shuffleReadMetrics.fetchWaitTime).getOrElse(0L)
+    val shuffleReadTimeProportion = toProportion(shuffleReadTime)
+    val shuffleWriteTime = 
(metricsOpt.map(_.shuffleWriteMetrics.writeTime).getOrElse(0L) / 1e6).toLong
+    val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)
+    val serializationTime = 
metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
+    val serializationTimeProportion = toProportion(serializationTime)
+    val deserializationTime = 
metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
+    val deserializationTimeProportion = toProportion(deserializationTime)
+    val gettingResultTime = if (info.gettingResult) {
+      if (info.finished) {
+        info.finishTime - info.gettingResultTime
+      } else {
+        0L //currentTime - info.gettingResultTime
+      }
+    } else {
+      0L
+    }
+    val gettingResultTimeProportion = toProportion(gettingResultTime)
+    val executorOverhead = serializationTime + deserializationTime
+    val executorRunTime = 
metricsOpt.map(_.executorRunTime).getOrElse(totalExecutionTime - 
executorOverhead - gettingResultTime)
+    val schedulerDelay = math.max(0, totalExecutionTime - executorRunTime - 
executorOverhead - gettingResultTime)
+    val schedulerDelayProportion = toProportion(schedulerDelay)
+    val executorComputingTime = executorRunTime - shuffleReadTime - 
shuffleWriteTime
+    val executorComputingTimeProportion =
+      math.max(100 - schedulerDelayProportion - shuffleReadTimeProportion -
+        shuffleWriteTimeProportion - serializationTimeProportion -
+        deserializationTimeProportion - gettingResultTimeProportion, 0)
+
+    val schedulerDelayProportionPos = 0
+    val deserializationTimeProportionPos = schedulerDelayProportionPos + 
schedulerDelayProportion
+    val shuffleReadTimeProportionPos = deserializationTimeProportionPos + 
deserializationTimeProportion
+    val executorRuntimeProportionPos = shuffleReadTimeProportionPos + 
shuffleReadTimeProportion
+    val shuffleWriteTimeProportionPos = executorRuntimeProportionPos + 
executorComputingTimeProportion
+    val serializationTimeProportionPos = shuffleWriteTimeProportionPos + 
shuffleWriteTimeProportion
+    val gettingResultTimeProportionPos = serializationTimeProportionPos + 
serializationTimeProportion
+
+    val jsonMetrics = if (metricsOpt.isDefined) {
+      Json.obj(
+        "shuffleReadTime" -> shuffleReadTime,
+        "shuffleWriteTime" -> shuffleWriteTime,
+        "serializationTime" -> serializationTime,
+        "deserializationTime" -> deserializationTime,
+        "gettingResultTime" -> gettingResultTime,
+        "executorComputingTime" -> executorComputingTime,
+        "schedulerDelay" -> schedulerDelay,
+        "shuffleReadTimeProportion" -> shuffleReadTimeProportion,
+        "shuffleWriteTimeProportion" -> shuffleWriteTimeProportion,
+        "serializationTimeProportion" -> serializationTimeProportion,
+        "deserializationTimeProportion" -> deserializationTimeProportion,
+        "gettingResultTimeProportion" -> gettingResultTimeProportion,
+        "executorComputingTimeProportion" -> executorComputingTimeProportion,
+        "schedulerDelayProportion" -> schedulerDelayProportion,
+        "shuffleReadTimeProportionPos" -> shuffleReadTimeProportionPos,
+        "shuffleWriteTimeProportionPos" -> shuffleWriteTimeProportionPos,
+        "serializationTimeProportionPos" -> serializationTimeProportionPos,
+        "deserializationTimeProportionPos" -> deserializationTimeProportionPos,
+        "gettingResultTimeProportionPos" -> gettingResultTimeProportionPos,
+        "executorComputingTimeProportionPos" -> executorRuntimeProportionPos,
+        "schedulerDelayProportionPos" -> schedulerDelayProportionPos,
+        "resultSize" -> metricsOpt.get.resultSize,
+        "jvmGCTime" -> metricsOpt.get.jvmGCTime,
+        "memoryBytesSpilled" -> metricsOpt.get.memoryBytesSpilled,
+        "diskBytesSpilled" -> metricsOpt.get.diskBytesSpilled,
+        "peakExecutionMemory" -> metricsOpt.get.peakExecutionMemory,
+        "test" -> info.gettingResultTime
+      )
+    } else {
+      Json.obj()
+    }
+
+    val json = Json.obj(
+      "msgtype" -> "sparkTaskEnd",
+      "launchTime" -> info.launchTime,
+      "finishTime" -> info.finishTime,
+      "taskId" -> info.taskId,
+      "stageId" -> taskEnd.stageId,
+      "taskType" -> taskEnd.taskType,
+      "stageAttemptId" -> taskEnd.stageAttemptId,
+      "index" -> info.index,
+      "attemptNumber" -> info.attemptNumber,
+      "executorId" -> info.executorId,
+      "host" -> info.host,
+      "status" -> info.status,
+      "speculative" -> info.speculative,
+      "errorMessage" -> errorMessage,
+      "metrics" -> jsonMetrics
+    )
+
+    logger.info("Task Ended: " + info.taskId)
+    logger.debug(Json.prettyPrint(json))
+
+    // Buffer the message for periodic flushing
+    sparkTasksQueue.put(Json.stringify(json))
+  }
+
+  /** If stored stages data is too large, remove and garbage collect old 
stages */
+  private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = 
synchronized {
+    if (stages.size > retainedStages) {
+      val toRemove = calculateNumberToRemove(stages.size, retainedStages)
+      stages.take(toRemove).foreach { s =>
+        stageIdToData.remove((s.stageId, s.attemptNumber()))
+        stageIdToInfo.remove(s.stageId)
+      }
+      stages.trimStart(toRemove)
+    }
+  }
+
+  /** If stored jobs data is too large, remove and garbage collect old jobs */
+  private def trimJobsIfNecessary(jobs: ListBuffer[UIData.JobUIData]) = 
synchronized {
+    if (jobs.size > retainedJobs) {
+      val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
+      jobs.take(toRemove).foreach { job =>
+        // Remove the job's UI data, if it exists
+        jobIdToData.remove(job.jobId).foreach { removedJob =>
+          // A null jobGroupId is used for jobs that are run without a job 
group
+          val jobGroupId = removedJob.jobGroup.orNull
+          // Remove the job group -> job mapping entry, if it exists
+          jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
+            jobsInGroup.remove(job.jobId)
+            // If this was the last job in this job group, remove the map 
entry for the job group
+            if (jobsInGroup.isEmpty) {
+              jobGroupToJobIds.remove(jobGroupId)
+            }
+          }
+        }
+      }
+      jobs.trimStart(toRemove)
+    }
+  }
+
+  /** Calculate number of items to remove from stored data. */
+  private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = 
{
+    math.max(retainedSize / 10, dataSize - retainedSize)
+  }
+
+  /** Called when an executor is added. */
+  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = synchronized {
+    executorCores(executorAdded.executorId) = 
executorAdded.executorInfo.totalCores
+    totalCores += executorAdded.executorInfo.totalCores
+    numExecutors += 1
+    val json = Json.obj(
+      "msgtype" -> "sparkExecutorAdded",
+      "executorId" -> executorAdded.executorId,
+      "time" -> executorAdded.time,
+      "host" -> executorAdded.executorInfo.executorHost,
+      "numCores" -> executorAdded.executorInfo.totalCores,
+      "totalCores" -> totalCores // Sending this as browser data can be lost 
during reloads
+    )
+
+    logger.info("Executor Added: " + executorAdded.executorId)
+    logger.debug(Json.prettyPrint(json))
+    send(json)
+  }
+
+  /** Called when an executor is removed. */
+  override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = synchronized {
+    totalCores -= executorCores.getOrElse(executorRemoved.executorId, 0)
+    numExecutors -= 1
+    val json = Json.obj(
+      "msgtype" -> "sparkExecutorRemoved",
+      "executorId" -> executorRemoved.executorId,
+      "time" -> executorRemoved.time,
+      "totalCores" -> totalCores // Sending this as browser data can be lost 
during reloads
+    )
+
+    logger.info("Executor Removed: " + executorRemoved.executorId)
+    logger.debug(Json.prettyPrint(json))
+
+    send(json)
+  }
+}
+
+/** Data Structures for storing received from listener events. */
+object UIData {
+
+  /**
+   * Data about a job.
+   *
+   * This is stored to track aggregated valus such as number of stages and 
tasks, and to track skipped and failed stages
+   */
+  class JobUIData(
+    var jobId: Int = -1,
+    var submissionTime: Option[Long] = None,
+    var completionTime: Option[Long] = None,
+    var stageIds: Seq[Int] = Seq.empty,
+    var jobGroup: Option[String] = None,
+    var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
+    var numTasks: Int = 0,
+    var numActiveTasks: Int = 0,
+    var numCompletedTasks: Int = 0,
+    var numSkippedTasks: Int = 0,
+    var numFailedTasks: Int = 0,
+    var reasonToNumKilled: Map[String, Int] = Map.empty,
+    var numActiveStages: Int = 0,
+    // This needs to be a set instead of a simple count to prevent 
double-counting of rerun stages:
+    var completedStageIndices: mutable.HashSet[Int] = new 
mutable.HashSet[Int](),
+    var numSkippedStages: Int = 0,
+    var numFailedStages: Int = 0)
+
+  /**
+   * Data about a stage.
+   *
+   * This is stored to track aggregated valus such as number of tasks.
+   */
+  class StageUIData {
+    var numActiveTasks: Int = _
+    var numCompletedTasks: Int = _
+    var completedIndices = new HashSet[Int]()
+    var numFailedTasks: Int = _
+    var description: Option[String] = None
+  }
+
+  /**
+   * Data about an executor.
+   *
+   * When an executor is removed, its number of cores is not available, so it 
is looked up here.
+   */
+  class ExecutorData {
+    var numCores: Int = _
+    var executorId: String = _
+    var timeAdded: Long = _
+    var timeRemoved: Long = _
+    var executorHost: String = _
+  }
+}
diff --git 
a/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/README.md
 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/README.md
new file mode 100644
index 00000000..d638f13b
--- /dev/null
+++ 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/README.md
@@ -0,0 +1,227 @@
+# SparkMonitor Plugin
+
+The SparkMonitor plugin provides real-time monitoring of Apache Spark jobs, 
stages, and tasks through a Jupyter comm channel in Apache Toree.
+
+## Features
+
+- **SparkListener Integration**: Automatically registers a 
JupyterSparkMonitorListener to the SparkContext when Spark becomes ready
+- **Communication Channel**: Provides a "SparkMonitor" comm target for 
bi-directional communication with clients
+- **Real-time Updates**: Sends comprehensive real-time notifications about:
+  - Application start/end events
+  - Job start/end events with detailed information
+  - Stage submission/completion events with metrics
+  - Task start/end events with performance data
+  - Executor addition/removal events
+- **Error Handling**: Robust error handling to prevent plugin failures from 
affecting the kernel
+- **Performance Monitoring**: Detailed task metrics including execution time, 
shuffle data, and memory usage
+
+## Architecture
+
+The plugin consists of two main components:
+
+### SparkMonitorPlugin
+- Extends the Toree `Plugin` class
+- Registers the "SparkMonitor" comm target during initialization
+- Listens for the "sparkReady" event to register the SparkListener
+- Manages the communication channel lifecycle
+
+### JupyterSparkMonitorListener
+- Extends Spark's `SparkListener` class
+- Monitors comprehensive Spark events and sends updates through the comm 
channel
+- Handles communication failures gracefully
+- Provides detailed event tracking with JSON-formatted messages
+- Includes active stage monitoring with periodic updates
+
+## Usage
+
+### Plugin Registration
+
+The plugin is automatically discovered and loaded by Toree's plugin system. No 
manual registration is required.
+
+### Client Communication
+
+Clients can connect to the SparkMonitor by opening a comm with the target name 
"SparkMonitor":
+
+```python
+# Example using Jupyter notebook
+from IPython.display import display
+import ipywidgets as widgets
+from traitlets import Unicode
+import json
+
+# Create a comm to connect to SparkMonitor
+comm = Comm(target_name='SparkMonitor')
+
+def handle_spark_event(msg):
+    data = msg['content']['data']
+    event_type = data.get('event', 'unknown')
+    print(f"Spark Event: {event_type}")
+    print(f"Details: {data}")
+
+comm.on_msg(handle_spark_event)
+```
+
+### Event Types
+
+The plugin sends the following types of events:
+
+#### Application Events
+- **sparkApplicationStart**: Fired when a Spark application starts
+  - `startTime`: Application start timestamp
+  - `appId`: Application identifier
+  - `appAttemptId`: Application attempt identifier
+  - `appName`: Application name
+  - `sparkUser`: Spark user
+
+- **sparkApplicationEnd**: Fired when a Spark application ends
+  - `endTime`: Application end timestamp
+
+#### Job Events
+- **sparkJobStart**: Fired when a Spark job starts
+  - `jobId`: Job identifier
+  - `jobGroup`: Job group identifier
+  - `status`: Job status (RUNNING)
+  - `submissionTime`: Job submission timestamp
+  - `stageIds`: Array of stage IDs
+  - `stageInfos`: Detailed stage information
+  - `numTasks`: Total number of tasks
+  - `totalCores`: Total available cores
+  - `numExecutors`: Number of executors
+  - `name`: Job name/description
+
+- **sparkJobEnd**: Fired when a Spark job ends
+  - `jobId`: Job identifier
+  - `status`: Job completion status (COMPLETED/FAILED)
+  - `completionTime`: Job completion timestamp
+
+#### Stage Events
+- **sparkStageSubmitted**: Fired when a stage is submitted
+  - `stageId`: Stage identifier
+  - `stageAttemptId`: Stage attempt identifier
+  - `name`: Stage name
+  - `numTasks`: Number of tasks in the stage
+  - `parentIds`: Parent stage IDs
+  - `submissionTime`: Submission timestamp
+  - `jobIds`: Associated job IDs
+
+- **sparkStageCompleted**: Fired when a stage completes
+  - `stageId`: Stage identifier
+  - `stageAttemptId`: Stage attempt identifier
+  - `completionTime`: Completion timestamp
+  - `submissionTime`: Submission timestamp
+  - `numTasks`: Total number of tasks
+  - `numFailedTasks`: Number of failed tasks
+  - `numCompletedTasks`: Number of completed tasks
+  - `status`: Stage status (COMPLETED/FAILED)
+  - `jobIds`: Associated job IDs
+
+- **sparkStageActive**: Periodic updates for active stages
+  - `stageId`: Stage identifier
+  - `stageAttemptId`: Stage attempt identifier
+  - `name`: Stage name
+  - `parentIds`: Parent stage IDs
+  - `numTasks`: Total number of tasks
+  - `numActiveTasks`: Number of currently active tasks
+  - `numFailedTasks`: Number of failed tasks
+  - `numCompletedTasks`: Number of completed tasks
+  - `jobIds`: Associated job IDs
+
+#### Task Events
+- **sparkTaskStart**: Fired when a task starts
+  - `launchTime`: Task launch timestamp
+  - `taskId`: Task identifier
+  - `stageId`: Parent stage identifier
+  - `stageAttemptId`: Stage attempt identifier
+  - `index`: Task index
+  - `attemptNumber`: Task attempt number
+  - `executorId`: Executor identifier
+  - `host`: Host name
+  - `status`: Task status
+  - `speculative`: Whether task is speculative
+
+- **sparkTaskEnd**: Fired when a task ends
+  - `launchTime`: Task launch timestamp
+  - `finishTime`: Task finish timestamp
+  - `taskId`: Task identifier
+  - `stageId`: Parent stage identifier
+  - `taskType`: Task type
+  - `stageAttemptId`: Stage attempt identifier
+  - `index`: Task index
+  - `attemptNumber`: Task attempt number
+  - `executorId`: Executor identifier
+  - `host`: Host name
+  - `status`: Task status
+  - `speculative`: Whether task is speculative
+  - `errorMessage`: Error message (if failed)
+  - `metrics`: Detailed task metrics (execution time, shuffle data, memory 
usage, etc.)
+
+#### Executor Events
+- **sparkExecutorAdded**: Fired when an executor is added
+  - `executorId`: Executor identifier
+  - `time`: Addition timestamp
+  - `host`: Host name
+  - `numCores`: Number of cores
+  - `totalCores`: Total cores across all executors
+
+- **sparkExecutorRemoved**: Fired when an executor is removed
+  - `executorId`: Executor identifier
+  - `time`: Removal timestamp
+  - `totalCores`: Remaining total cores
+
+## Example Spark Code
+
+To see the SparkMonitor in action, run some Spark operations:
+
+```scala
+// Create an RDD and perform some operations
+val rdd = sc.parallelize(1 to 1000, 10)
+val result = rdd.map(_ * 2).filter(_ > 100).collect()
+
+// The SparkMonitor will send notifications about:
+// - Job start/end
+// - Stage submissions/completions
+// - Task executions
+```
+
+## Configuration
+
+The plugin doesn't require any specific configuration. It automatically:
+- Registers the "SparkMonitor" comm target during kernel initialization
+- Registers the JupyterSparkMonitorListener when Spark becomes ready
+- Handles comm connection lifecycle automatically
+- Provides periodic active stage monitoring (every 1 second)
+
+### Configurable Parameters
+
+The following parameters can be adjusted by modifying the 
JupyterSparkMonitorListener class:
+- `sparkStageActiveTasksMaxMessages`: Maximum number of task messages to 
buffer (default: 250)
+- `sparkStageActiveRate`: Rate of active stage monitoring in milliseconds 
(default: 1000ms)
+
+## Error Handling
+
+The plugin includes comprehensive error handling:
+- Graceful handling of SparkContext unavailability
+- Safe communication channel operations
+- Logging of errors without affecting kernel stability
+
+## Development
+
+To extend the plugin:
+
+1. Add new event handlers to `SparkMonitorListener`
+2. Modify the `sendUpdate` method to include additional data
+3. Update the comm message handlers in `SparkMonitorPlugin`
+
+## Testing
+
+Run the plugin tests using:
+
+```bash
+sbt "project plugins" test
+```
+
+The test suite includes:
+- Plugin initialization tests
+- SparkListener registration tests
+- Communication channel tests
+- Error handling tests
\ No newline at end of file
diff --git 
a/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/SparkMonitorPlugin.scala
 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/SparkMonitorPlugin.scala
new file mode 100644
index 00000000..d2cbe308
--- /dev/null
+++ 
b/spark-monitor-plugin/src/main/scala/org/apache/toree/plugins/sparkmonitor/SparkMonitorPlugin.scala
@@ -0,0 +1,276 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License
+ */
+
+package org.apache.toree.plugins.sparkmonitor
+
+import org.apache.toree.plugins.{AllInterpretersReady, Plugin, PluginManager, 
SparkReady}
+import org.apache.toree.plugins.annotations.{Event, Init}
+import org.apache.toree.plugins.dependencies.Dependency
+import org.apache.toree.kernel.api.KernelLike
+import org.apache.toree.kernel.api.Kernel
+import org.apache.toree.comm.{CommRegistrar, CommWriter}
+import org.apache.toree.kernel.protocol.v5.{MsgData, UUID}
+import org.apache.toree.utils.ScheduledTaskManager
+import org.apache.spark.SparkContext
+import org.apache.log4j.Logger
+
+import scala.util.Try
+import scala.reflect.runtime.universe
+import java.lang.reflect.Field
+
+/**
+ * Plugin that registers a JupyterSparkMonitorListener to SparkContext after 
kernel started
+ * and provides communication through a "SparkMonitor" comm target.
+ * 
+ * This plugin uses proper typing where possible and falls back to reflection 
only when necessary
+ * to maintain compatibility across different kernel implementations.
+ */
+class SparkMonitorPlugin extends Plugin {
+
+  private val logger = Logger.getLogger(this.getClass.getName)
+  private var sparkMonitorListener: Option[JupyterSparkMonitorListener] = None
+  private var commWriter: Option[CommWriter] = None
+  private val taskManager = new ScheduledTaskManager()
+  private var sparkContextMonitorTaskId: Option[String] = None
+  private var currentKernel: Option[KernelLike] = None
+  private var pluginManager: Option[PluginManager] = None
+  
+  // Communication target name - extracted as constant for better 
maintainability
+  private val COMM_TARGET_NAME = "SparkMonitor"
+
+  /**
+   * Initialize the plugin by registering the SparkMonitor comm target.
+   * Uses proper typing with safe casting and comprehensive error handling.
+   * 
+   * @param kernel The kernel instance implementing KernelLike interface
+   */
+  @Init
+  def initializePlugin(kernel: KernelLike): Unit = {
+    logger.info(s"Initializing SparkMonitor plugin with comm target: 
$COMM_TARGET_NAME")
+    
+    Try {
+      // Cast to concrete Kernel type to access comm property. This plugin 
need to use toree-kernel module
+      val concreteKernel = kernel.asInstanceOf[Kernel]
+      val commManager = concreteKernel.comm
+      
+      // Register comm target - now we can use proper types
+      val commRegistrar = commManager.register(COMM_TARGET_NAME)
+      setupCommHandlers(commRegistrar)
+      
+      // Start background process to monitor SparkContext creation
+      startSparkContextMonitoring()
+      
+      logger.info("SparkMonitor plugin initialized successfully")
+    }.recover {
+      case ex => logger.error(s"Failed to initialize SparkMonitor plugin: 
${ex.getMessage}", ex)
+    }
+  }
+  
+  /**
+   * Initialize the plugin manager reference.
+   *
+   * @param manager The plugin manager instance
+   */
+  @Init
+  def initializePluginManager(manager: PluginManager): Unit = {
+    logger.info("Initializing PluginManager reference")
+    pluginManager = Some(manager)
+  }
+  
+  /**
+   * Sets up all communication handlers for the registered comm target.
+   */
+  private def setupCommHandlers(commRegistrar: CommRegistrar): Unit = {
+    // Add open handler
+    commRegistrar.addOpenHandler { (commWriter: CommWriter, commId: UUID, 
targetName: String, data: MsgData) =>
+      Try {
+        logger.info(s"SparkMonitor comm opened - ID: $commId, Target: 
$targetName")
+        this.commWriter = Some(commWriter)
+        
+        // Send initial connection message
+        val message = MsgData("msgtype" -> "commopen")
+        commWriter.writeMsg(message)
+      }.recover {
+        case ex => logger.warn(s"Error in comm open handler: 
${ex.getMessage}", ex)
+      }
+    }
+    
+    // Add message handler
+    commRegistrar.addMsgHandler { (commWriter: CommWriter, commId: UUID, data: 
MsgData) =>
+      logger.debug(s"SparkMonitor received message from comm $commId: $data")
+      // Handle incoming messages from client if needed
+      // This can be extended for bidirectional communication
+    }
+    
+    // Add close handler
+    commRegistrar.addCloseHandler { (commWriter: CommWriter, commId: UUID, 
data: MsgData) =>
+      logger.info(s"SparkMonitor comm closed - ID: $commId")
+      this.commWriter = None
+    }
+  }
+
+  /**
+   * Starts a background task to monitor SparkContext creation.
+   * This task runs periodically to check if SparkContext becomes available 
using reflection.
+   */
+  private def startSparkContextMonitoring(): Unit = {
+    logger.info("Starting SparkContext monitoring background task")
+    
+    val taskId = taskManager.addTask(
+      executionDelay = 1000, // Start checking after 1 second
+      timeInterval = 2000,    // Check every 2 seconds
+      task = {
+        try {
+          logger.info("Task execution started - this should appear every 2 
seconds")
+          checkSparkContextAndNotify()
+          logger.info("Task execution completed")
+        } catch {
+          case ex: Exception =>
+            logger.error("Task execution failed", ex)
+        }
+      }
+    )
+    
+    sparkContextMonitorTaskId = Some(taskId)
+    logger.debug(s"SparkContext monitoring task started with ID: $taskId")
+  }
+  
+  /**
+   * Checks if SparkContext is available using reflection to access private 
activeContext field.
+   * Once SparkContext is found, stops the monitoring task and fires 
SparkReady event.
+   *
+   * Uses reflection to safely access SparkContext.activeContext without 
triggering instantiation.
+   */
+  private def checkSparkContextAndNotify(): Unit = {
+    Try {
+      logger.debug("checkSparkContextAndNotify is running")
+      getActiveSparkContext() match {
+        case Some(sparkContext) if !sparkContext.isStopped =>
+          logger.info("SparkContext detected! Firing SparkReady event.")
+          
+          // Stop the monitoring task since SparkContext is now available
+          stopSparkContextMonitoring()
+          
+          // Fire SparkReady event through plugin manager to notify all plugins
+          fireSparkReadyEvent()
+          
+        case Some(sparkContext) =>
+          logger.debug("SparkContext exists but is stopped, continuing to 
monitor...")
+          
+        case None =>
+          logger.debug("No SparkContext found, continuing to monitor...")
+      }
+    }.recover {
+      case ex =>
+        logger.debug(s"Error checking SparkContext availability: 
${ex.getMessage}")
+    }
+  }
+  
+  /**
+   * Uses reflection to safely access the private activeContext field from 
SparkContext.
+   * This approach doesn't trigger SparkContext instantiation.
+   */
+  private def getActiveSparkContext(): Option[SparkContext] = {
+    Try {
+      val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
+      val moduleSymbol = 
runtimeMirror.staticModule("org.apache.spark.SparkContext")
+      val moduleMirror = runtimeMirror.reflectModule(moduleSymbol)
+      val sparkContext = moduleMirror.instance
+
+      val activeContextField: Field = 
sparkContext.getClass().getDeclaredField("org$apache$spark$SparkContext$$activeContext")
+      activeContextField.setAccessible(true)
+
+      val activeContextRef = 
activeContextField.get(sparkContext).asInstanceOf[java.util.concurrent.atomic.AtomicReference[SparkContext]]
+      Option(activeContextRef.get())
+    }.recover {
+      case ex =>
+        logger.error(s"Failed to access activeContext field via reflection: 
${ex.getMessage} \n $ex")
+        None
+    }.getOrElse(None)
+  }
+  
+  /**
+   * Fires a SparkReady event through the plugin manager to notify all 
listening plugins.
+   */
+  private def fireSparkReadyEvent(): Unit = {
+    Try {
+      // Create a KernelLike dependency to pass along with the event
+      pluginManager match {
+        case Some(manager) =>
+          manager.fireEvent(SparkReady)
+          logger.info("SparkReady event fired to all plugins")
+        case _ =>
+          logger.warn("Cannot fire SparkReady event: kernel or plugin manager 
not available")
+      }
+    }.recover {
+      case ex => logger.warn(s"Failed to fire SparkReady event: 
${ex.getMessage}")
+    }
+  }
+  
+  /**
+   * Stops the SparkContext monitoring background task.
+   */
+  private def stopSparkContextMonitoring(): Unit = {
+    sparkContextMonitorTaskId.foreach { taskId =>
+      if (taskManager.removeTask(taskId)) {
+        logger.info(s"SparkContext monitoring task stopped (ID: $taskId)")
+      }
+      sparkContextMonitorTaskId = None
+    }
+  }
+
+  /**
+   * Handle the SparkReady event to register the JupyterSparkMonitorListener.
+   * This method is called when Spark becomes available in the kernel.
+   * Uses direct access to SparkContext when possible.
+   */
+  @Event(name = "sparkReady")
+  def onReady(kernel: KernelLike): Unit = {
+    logger.info("SparkReady event received, registering 
JupyterSparkMonitorListener")
+    
+    // Stop monitoring task if still running since SparkContext is ready
+    stopSparkContextMonitoring()
+    
+    Try {
+      val sparkContext = kernel.sparkContext
+      // Pass a callback function that always gets the current commWriter
+      val listener = new JupyterSparkMonitorListener(() => commWriter)
+      
+      sparkContext.addSparkListener(listener)
+      sparkMonitorListener = Some(listener)
+      
+      logger.info("JupyterSparkMonitorListener registered successfully")
+      // notifySparkListenerRegistration()
+    }.recover {
+      case ex => logger.error(s"Failed to register 
JupyterSparkMonitorListener: ${ex.getMessage}", ex)
+    }
+  }
+  
+  /**
+   * Notifies clients that the Spark listener has been registered.
+   */
+  private def notifySparkListenerRegistration(): Unit = {
+    commWriter.foreach { writer =>
+      Try {
+        val message = MsgData("text/plain" -> "JupyterSparkMonitorListener 
registered")
+        writer.writeMsg(message)
+      }.recover {
+        case ex => logger.warn(s"Failed to send SparkListener registration 
notification: ${ex.getMessage}")
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to