Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-11 Thread via GitHub


HeartSaVioR closed pull request #45360: [SPARK-47250][SS] Add additional 
validations and NERF changes for RocksDB state provider and use of column 
families
URL: https://github.com/apache/spark/pull/45360


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-11 Thread via GitHub


HeartSaVioR commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1990520156

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1520296947


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -582,7 +636,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 assert(iterator(db, colFamily2).isEmpty)
   }
   assert(ex.isInstanceOf[RuntimeException])
-  assert(ex.getMessage.contains("does not exist"))
+  assert(ex.getMessage.contains("missing column family"))

Review Comment:
   Done



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##
@@ -134,6 +134,46 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = {
+val ex = intercept[UnsupportedOperationException] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-11 Thread via GitHub


anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1520296736


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -536,6 +536,67 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+val remoteDir = Utils.createTempDir().toString
+new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+val conf = RocksDBConf().copy()
+withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+  Seq("default", "", " ", "", " default", " default ").foreach { 
colFamilyName =>
+val ex = intercept[Exception] {

Review Comment:
   Done



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -536,6 +536,67 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+val remoteDir = Utils.createTempDir().toString
+new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+val conf = RocksDBConf().copy()
+withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+  Seq("default", "", " ", "", " default", " default ").foreach { 
colFamilyName =>
+val ex = intercept[Exception] {
+  db.createColFamilyIfAbsent(colFamilyName)
+}
+ex.getCause.isInstanceOf[UnsupportedOperationException]
+  }
+}
+  }
+
+  private def verifyStoreOperationUnsupported(
+  operationName: String)
+  (testFn: => Unit): Unit = {
+val ex = intercept[UnsupportedOperationException] {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-08 Thread via GitHub


HeartSaVioR commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1518475424


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -582,7 +636,7 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 assert(iterator(db, colFamily2).isEmpty)
   }
   assert(ex.isInstanceOf[RuntimeException])
-  assert(ex.getMessage.contains("does not exist"))
+  assert(ex.getMessage.contains("missing column family"))

Review Comment:
   ditto



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -536,6 +536,67 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+val remoteDir = Utils.createTempDir().toString
+new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+val conf = RocksDBConf().copy()
+withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+  Seq("default", "", " ", "", " default", " default ").foreach { 
colFamilyName =>
+val ex = intercept[Exception] {

Review Comment:
   Verifying exception belongs to error class framework needs be done with 
checkError.



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -536,6 +536,67 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  testWithColumnFamilies(s"RocksDB: column family creation with invalid names",
+TestWithBothChangelogCheckpointingEnabledAndDisabled) { colFamiliesEnabled 
=>
+val remoteDir = Utils.createTempDir().toString
+new File(remoteDir).delete() // to make sure that the directory gets 
created
+
+val conf = RocksDBConf().copy()
+withDB(remoteDir, conf = conf, useColumnFamilies = colFamiliesEnabled) { 
db =>
+  Seq("default", "", " ", "", " default", " default ").foreach { 
colFamilyName =>
+val ex = intercept[Exception] {
+  db.createColFamilyIfAbsent(colFamilyName)
+}
+ex.getCause.isInstanceOf[UnsupportedOperationException]
+  }
+}
+  }
+
+  private def verifyStoreOperationUnsupported(
+  operationName: String)
+  (testFn: => Unit): Unit = {
+val ex = intercept[UnsupportedOperationException] {

Review Comment:
   ditto



##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##
@@ -134,6 +134,46 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 }
   }
 
+  private def verifyStoreOperationUnsupported()(testFn: => Unit): Unit = {
+val ex = intercept[UnsupportedOperationException] {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-07 Thread via GitHub


HeartSaVioR commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1985157244

   Will review sooner than later. Maybe by today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-07 Thread via GitHub


HyukjinKwon commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1984994353

   Is this good to go? @HeartSaVioR @rangadi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-06 Thread via GitHub


anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1514964881


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -246,25 +246,35 @@ class RocksDB(
 colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-  throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: 
String): Unit = {
+if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+  if (!useColumnFamilies) {
+throw StateStoreErrors.unsupportedOperationException(operationName,
+  "RocksDBStateStoreProvider and multiple column families disabled")
+  }
+
+  if (!checkColFamilyExists(colFamilyName)) {
+throw 
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+  colFamilyName)
+  }
 }
   }
 
   /**
* Create RocksDB column family, if not created already
*/
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-  throw new SparkUnsupportedOperationException(
-errorClass = "_LEGACY_ERROR_TEMP_3197",
-messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+// Remove leading and trailing whitespaces
+val cfName = colFamilyName.trim

Review Comment:
   Modified to check and throw an exception if some invariants are not met for 
both the col family operations case and the creation/deletion case 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-05 Thread via GitHub


anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513631712


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -246,25 +246,35 @@ class RocksDB(
 colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-  throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: 
String): Unit = {
+if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+  if (!useColumnFamilies) {
+throw StateStoreErrors.unsupportedOperationException(operationName,
+  "RocksDBStateStoreProvider and multiple column families disabled")
+  }
+
+  if (!checkColFamilyExists(colFamilyName)) {
+throw 
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+  colFamilyName)
+  }
 }
   }
 
   /**
* Create RocksDB column family, if not created already
*/
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-  throw new SparkUnsupportedOperationException(
-errorClass = "_LEGACY_ERROR_TEMP_3197",
-messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+// Remove leading and trailing whitespaces
+val cfName = colFamilyName.trim

Review Comment:
   Hmm - user facing APIs shouldn't actually call state store operations 
directly by passing the name. The assumption is we should never have col 
families created with leading/trailing spaces. Do you think we should add an 
assert to verify the same and throw an exception ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-05 Thread via GitHub


anishshri-db commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513624528


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -246,25 +246,35 @@ class RocksDB(
 colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-  throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: 
String): Unit = {
+if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+  if (!useColumnFamilies) {
+throw StateStoreErrors.unsupportedOperationException(operationName,
+  "RocksDBStateStoreProvider and multiple column families disabled")

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-05 Thread via GitHub


sahnib commented on code in PR #45360:
URL: https://github.com/apache/spark/pull/45360#discussion_r1513594152


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -246,25 +246,35 @@ class RocksDB(
 colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-  throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: 
String): Unit = {
+if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+  if (!useColumnFamilies) {
+throw StateStoreErrors.unsupportedOperationException(operationName,
+  "RocksDBStateStoreProvider and multiple column families disabled")
+  }
+
+  if (!checkColFamilyExists(colFamilyName)) {
+throw 
StateStoreErrors.unsupportedOperationOnMissingColumnFamily(operationName,
+  colFamilyName)
+  }
 }
   }
 
   /**
* Create RocksDB column family, if not created already
*/
   def createColFamilyIfAbsent(colFamilyName: String): Unit = {
-if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
-  throw new SparkUnsupportedOperationException(
-errorClass = "_LEGACY_ERROR_TEMP_3197",
-messageParameters = Map("colFamilyName" -> colFamilyName).toMap)
+// Remove leading and trailing whitespaces
+val cfName = colFamilyName.trim

Review Comment:
   This trimming also needs to happen when the cx issues a put/get etc against 
the column family. I dont think thats happening today. In that case, would this 
lead to inconsistencies? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -246,25 +246,35 @@ class RocksDB(
 colFamilyNameToHandleMap.contains(colFamilyName)
   }
 
-  private def verifyColFamilyExists(colFamilyName: String): Unit = {
-if (useColumnFamilies && !checkColFamilyExists(colFamilyName)) {
-  throw new RuntimeException(s"Column family with name=$colFamilyName does 
not exist")
+  private def verifyColFamilyExists(operationName: String, colFamilyName: 
String): Unit = {
+if (colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME) {
+  if (!useColumnFamilies) {
+throw StateStoreErrors.unsupportedOperationException(operationName,
+  "RocksDBStateStoreProvider and multiple column families disabled")

Review Comment:
   can be reworded as - `multiple column families disabled in 
RocksDBStateStoreProvider`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47250][SS] Add additional validations and NERF changes for RocksDB state provider and use of column families [spark]

2024-03-01 Thread via GitHub


anishshri-db commented on PR #45360:
URL: https://github.com/apache/spark/pull/45360#issuecomment-1974249820

   @sahnib @HeartSaVioR - PTAL, thx !
   
   @HeartSaVioR - let me know if you are ok with the proposed dir layout 
changes and also if you prefer them in a separate PR. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org