anishshri-db commented on code in PR #47778: URL: https://github.com/apache/spark/pull/47778#discussion_r1721174715
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -498,116 +492,63 @@ private[sql] class RocksDBStateStoreProvider } /** - * Class for column family related utility functions. - * Verification functions for column family names, column family operation validations etc. + * Function to verify invariants for column family based operations + * such as get, put, remove etc. + * + * @param operationName - name of the store operation + * @param colFamilyName - name of the column family */ - private object ColumnFamilyUtils { - private val multColFamiliesDisabledStr = "multiple column families is disabled in " + - "RocksDBStateStoreProvider" - - /** - * Function to verify invariants for column family based operations - * such as get, put, remove etc. - * - * @param operationName - name of the store operation - * @param colFamilyName - name of the column family - */ - def verifyColFamilyOperations( - operationName: String, - colFamilyName: String): Unit = { - if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { - // if the state store instance does not support multiple column families, throw an exception - if (!useColumnFamilies) { - throw StateStoreErrors.unsupportedOperationException(operationName, - multColFamiliesDisabledStr) - } - - // if the column family name is empty or contains leading/trailing whitespaces, throw an - // exception - if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) { - throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName) - } - - // if the column family does not exist, throw an exception - if (!checkColFamilyExists(colFamilyName)) { - throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, - colFamilyName) - } - } - } - - /** - * Function to verify invariants for column family creation or deletion operations. - * - * @param operationName - name of the store operation - * @param colFamilyName - name of the column family - */ - private def verifyColFamilyCreationOrDeletion( - operationName: String, - colFamilyName: String, - isInternal: Boolean = false): Unit = { + private def verifyColFamilyOperations( + operationName: String, + colFamilyName: String): Unit = { + if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) { // if the state store instance does not support multiple column families, throw an exception if (!useColumnFamilies) { throw StateStoreErrors.unsupportedOperationException(operationName, multColFamiliesDisabledStr) } - // if the column family name is empty or contains leading/trailing whitespaces - // or using the reserved "default" column family, throw an exception - if (colFamilyName.isEmpty - || colFamilyName.trim != colFamilyName - || colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) { + // if the column family name is empty or contains leading/trailing whitespaces, throw an + // exception + if (colFamilyName.isEmpty || colFamilyName.trim != colFamilyName) { throw StateStoreErrors.cannotUseColumnFamilyWithInvalidName(operationName, colFamilyName) } - // if the column family is not internal and uses reserved characters, throw an exception - if (!isInternal && colFamilyName.charAt(0) == '_') { - throw StateStoreErrors.cannotCreateColumnFamilyWithReservedChars(colFamilyName) + // if the column family does not exist, throw an exception + if (!rocksDB.checkColFamilyExists(colFamilyName)) { + throw StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName, + colFamilyName) } } + } - /** - * Check whether the column family name is for internal column families. - * - * @param cfName - column family name - * @return - true if the column family is for internal use, false otherwise - */ - def checkInternalColumnFamilies(cfName: String): Boolean = cfName.charAt(0) == '_' - - /** - * Create RocksDB column family, if not created already - */ - def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean = false): - Option[Short] = { - verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal) - if (!checkColFamilyExists(colFamilyName)) { - val newColumnFamilyId = colFamilyId.incrementAndGet().toShort - colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId) - Option(newColumnFamilyId) - } else None + /** + * Function to verify invariants for column family creation or deletion operations. + * + * @param operationName - name of the store operation + * @param colFamilyName - name of the column family + */ + private def verifyColFamilyCreationOrDeletion( + operationName: String, + colFamilyName: String, + isInternal: Boolean = false): Unit = { + // if the state store instance does not support multiple column families, throw an exception + if (!useColumnFamilies) { + throw StateStoreErrors.unsupportedOperationException(operationName, + multColFamiliesDisabledStr) } - /** - * Remove RocksDB column family, if exists - */ - def removeColFamilyIfExists(colFamilyName: String): Boolean = { - verifyColFamilyCreationOrDeletion("remove_col_family", colFamilyName) - if (checkColFamilyExists(colFamilyName)) { - colFamilyNameToIdMap.remove(colFamilyName) - true - } else { - false - } + // if the column family name is empty or contains leading/trailing whitespaces + // or using the reserved "default" column family, throw an exception + if (colFamilyName.isEmpty + || colFamilyName.trim != colFamilyName Review Comment: lets use `(` and `)` explicitly ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org