haiyangsun-db commented on code in PR #55712:
URL: https://github.com/apache/spark/pull/55712#discussion_r3207316040


##########
udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/UDFDispatcherManager.scala:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.udf.worker.core
+
+import java.util.{ArrayList, HashMap}
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.udf.worker.UDFWorkerSpecification
+
+/**
+ * :: Experimental :: Creates [[WorkerSession]] instances for a given
+ * [[UDFWorkerSpecification]], managing [[WorkerDispatcher]] instances and
+ * their lifecycle internally.
+ *
+ * Dispatchers are cached by spec (protobuf value equality) and reused across
+ * sessions. The manager tracks the number of active sessions per dispatcher
+ * via [[WorkerSession#addSessionCompletionListener]]. When the last session
+ * for a dispatcher is closed, the entry is removed and
+ * [[UDFDispatcherFactory#onAllDispatcherSessionsClosed]] is called.
+ *
+ * You might be wondering why the Dispatcher does not track the number of
+ * active sessions itself. The reason is that this would create a
+ * unavoidable race condition: Clients can provide different worker
+ * specs. Therefore, different dispatchers may be required, which cannot all
+ * exist for the whole Spark lifetime -> Dispatchers need to be 
removed/terminated
+ * at some point. If Dispatchers were to track their active sessions themselves
+ * and we would use this to decide on the dispatcher lifetime, it can always
+ * happen that there are concurrent [[createSession]] requests while
+ * the Dispatcher is being disposed off - which would create session
+ * initialization errors and may cause Spark task/query failures.
+ * Instead, we track the active sessions per Dispatcher globally
+ * in this manager.
+ *
+ * Thread safety: a single lock guards all state -- dispatchers, active
+ * sessions, and the stopping flag.
+ */
+@Experimental
+class UDFDispatcherManager(
+    private val dispatcherFactory: UDFDispatcherFactory,
+    workerLogger: WorkerLogger = WorkerLogger.NoOp
+) {
+
+  private val logger: WorkerLogger =
+    workerLogger.forClass(getClass)
+
+  /*
+   * Why do we need an [[activeSessionCount]] and an [[activeSessions]]
+   * list? [[activeSessionCount]] is per dispatcher. [[activeSessions]]
+   * is globally and allows us to perform session cleanup on [[stop]].
+   * Moreover, this distinction allows us to create sessions without
+   * requiring a lock on [[lock]].
+   */
+  private class DispatcherEntry(val dispatcher: WorkerDispatcher) {
+    var activeSessionCount: Int = 0
+  }
+
+  // All fields below are guarded by `lock`.
+  private val lock = new Object
+  private val dispatchers =
+    new HashMap[UDFWorkerSpecification, DispatcherEntry]()
+  private val activeSessions = new ArrayList[WorkerSession]()

Review Comment:
   For example, the dispatcher manager here creates dispatcher, then it takes 
care of disposal of dispatchers.
   
   A dispatcher spawns workers and sessions, then the disposal of session and 
workers should live in a dispatcher.
   
   Because creation and disposal logic can be related, it's better to colocate 
them together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to