anishshri-db commented on code in PR #47778:
URL: https://github.com/apache/spark/pull/47778#discussion_r1719373790


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -166,6 +170,81 @@ class RocksDB(
   @GuardedBy("acquireLock")
   @volatile private var acquiredThreadInfo: AcquiredThreadInfo = _
 
+  // This is accessed and updated only between load and acquire
+  // which means it is implicitly guarded by acquireLock
+  private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()
+
+  private val defaultColumnFamilyIdMapping =
+    Map(StateStore.DEFAULT_COL_FAMILY_NAME -> 
StateStore.DEFAULT_COL_FAMILY_ID).asJava
+
+  private val maxColumnFamilyId: AtomicInteger = new AtomicInteger(0)
+
+  private val shouldForceSnapshot: AtomicBoolean = new AtomicBoolean(false)
+
+  def getColFamilyNameToIdMap: Map[String, Short] = {
+    colFamilyNameToIdMap.asScala.toMap
+  }
+
+  /**
+   * Check whether the column family name is for internal column families.
+   *
+   * @param cfName - column family name
+   * @return - true if the column family is for internal use, false otherwise
+   */
+  def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) 
== '_'
+
+  /**
+   * Create RocksDB column family, if not created already
+   */
+  def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = 
false): Short = {
+    if (!checkColFamilyExists(colFamilyName)) {
+      val newColumnFamilyId = maxColumnFamilyId.incrementAndGet().toShort
+      colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
+      shouldForceSnapshot.set(true)
+      newColumnFamilyId
+    } else colFamilyNameToIdMap.get(colFamilyName)

Review Comment:
   nit: can we put this inside a brace ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to