Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
MaxGekk closed pull request #45359: [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle URL: https://github.com/apache/spark/pull/45359 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
MaxGekk commented on PR #45359: URL: https://github.com/apache/spark/pull/45359#issuecomment-1977192138 All GAs passed. +1, LGTM. Merging to master. Thank you, @ericm-db and @anishshri-db for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
MaxGekk commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1511578822 ## sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala: ## @@ -53,6 +53,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { e) } + def stateStoreHandleNotInitialized(): SparkRuntimeException = { Review Comment: I see. ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1511477968 ## sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala: ## @@ -53,6 +53,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { e) } + def stateStoreHandleNotInitialized(): SparkRuntimeException = { Review Comment: @MaxGekk we can't move it to `StateStoreErrors.scala` because we can't use it in `sql/api/` where we throw the error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
MaxGekk commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1511001426 ## sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala: ## @@ -53,6 +53,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { e) } + def stateStoreHandleNotInitialized(): SparkRuntimeException = { Review Comment: @ericm-db Could you move it, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1510692025 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -195,6 +179,20 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("Use statefulProcessor without transformWithState - handle should be absent") { +val processor = new RunningCountStatefulProcessor() +val ex = intercept[Exception] { + processor.getHandle +} +checkError( + ex.asInstanceOf[SparkRuntimeException], + errorClass = "STATE_STORE_HANDLE_NOT_INITIALIZED", + parameters = Map.empty +) +assert(ex.getMessage.contains("The handle has not been initialized" + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
MaxGekk commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1510018920 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -195,6 +179,20 @@ class TransformWithStateSuite extends StateStoreMetricsTest } } + test("Use statefulProcessor without transformWithState - handle should be absent") { +val processor = new RunningCountStatefulProcessor() +val ex = intercept[Exception] { + processor.getHandle +} +checkError( + ex.asInstanceOf[SparkRuntimeException], + errorClass = "STATE_STORE_HANDLE_NOT_INITIALIZED", + parameters = Map.empty +) +assert(ex.getMessage.contains("The handle has not been initialized" + Review Comment: Could you remove the assert. The error text can be modified to tech writer, or it can be translated to other languages. If we expect some text in tests, this won't be possible w/o rewriting such tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on PR #45359: URL: https://github.com/apache/spark/pull/45359#issuecomment-1973745035 cc @HeartSaVioR @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1509393780 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -3320,6 +3320,13 @@ ], "sqlState" : "42802" }, + "STATE_STORE_HANDLE_NOT_INITIALIZED" : { +"message" : [ + "The handle has not been initialized for this StatefulProcessor.", + "Please only use the StatefulProcessor with the transformWithState operator." Review Comment: Nit: `within the` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1509389021 ## sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala: ## @@ -53,6 +53,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { e) } + def stateStoreHandleNotInitialized(): SparkRuntimeException = { Review Comment: Can we move this to the `StateStoreErrors.scala` to keep error classes in this area in a common location ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45359: URL: https://github.com/apache/spark/pull/45359#discussion_r1509389021 ## sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala: ## @@ -53,6 +53,12 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { e) } + def stateStoreHandleNotInitialized(): SparkRuntimeException = { Review Comment: Can we move this to the `StateErrors.scala` to keep error classes in this area in a common location ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db opened a new pull request, #45359: URL: https://github.com/apache/spark/pull/45359 ### What changes were proposed in this pull request? Setting the processorHandle as a part of the statefulProcessor, so that the user doesn't have to explicitly keep track of it, and can instead simply call `getStatefulProcessorHandle` ### Why are the changes needed? This enhances the usability of the State API ### Does this PR introduce _any_ user-facing change? Yes, this is an API change. This enhances usability of the StatefulProcessorHandle and the TransformWithState operator. ### How was this patch tested? Existing unit tests are sufficient ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db closed pull request #45002: [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle URL: https://github.com/apache/spark/pull/45002 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1482110798 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils { class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (String, String)] with Logging { @transient private var _countState: ValueState[Long] = _ - @transient var _processorHandle: StatefulProcessorHandle = _ - - override def init( - handle: StatefulProcessorHandle, - outputMode: OutputMode) : Unit = { -_processorHandle = handle -assert(handle.getQueryInfo().getBatchId >= 0) -_countState = _processorHandle.getValueState[Long]("countState") + + override def init(outputMode: OutputMode): Unit = { Review Comment: Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481977985 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ## @@ -31,14 +31,9 @@ object TransformWithStateSuiteUtils { class RunningCountStatefulProcessor extends StatefulProcessor[String, String, (String, String)] with Logging { @transient private var _countState: ValueState[Long] = _ - @transient var _processorHandle: StatefulProcessorHandle = _ - - override def init( - handle: StatefulProcessorHandle, - outputMode: OutputMode) : Unit = { -_processorHandle = handle -assert(handle.getQueryInfo().getBatchId >= 0) -_countState = _processorHandle.getValueState[Long]("countState") + + override def init(outputMode: OutputMode): Unit = { Review Comment: Can we also add a test for verifying that the context is not available if its not initialized ? Maybe within `ValueStateSuite` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481976306 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental} */ @Experimental @Evolving -private[sql] trait StatefulProcessor[K, I, O] extends Serializable { +private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { Review Comment: I believe trait can have default/concrete implementation too ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
ericm-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481973732 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental} */ @Experimental @Evolving -private[sql] trait StatefulProcessor[K, I, O] extends Serializable { +private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { Review Comment: We want to provide implementation for the getters and setters, so it can no longer be a trait, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481971683 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * any cleanup or teardown operations. */ def close (): Unit + + /** + * Function to set the stateful processor handle that will be used to interact with the state + * store and other stateful processor related operations. + * @param handle - instance of StatefulProcessorHandle + */ + final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit = { +statefulProcessorHandle = handle + } + + /** + * Function to get the stateful processor handle that will be used to interact with the state + * @return handle - instance of StatefulProcessorHandle + */ + final def getStatefulProcessorHandle: StatefulProcessorHandle = { +statefulProcessorHandle Review Comment: Lets set the initial value to null. And if its null, then lets raise an exception/error to say that handle is not available ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481972279 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * any cleanup or teardown operations. */ def close (): Unit Review Comment: Can you also just add default impl for this as empty ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481970354 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * any cleanup or teardown operations. */ def close (): Unit + + /** + * Function to set the stateful processor handle that will be used to interact with the state + * store and other stateful processor related operations. + * @param handle - instance of StatefulProcessorHandle + */ + final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit = { Review Comment: Maybe just say `setHandle` and `getHandle` for simplicity ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481970354 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -60,4 +57,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable { * any cleanup or teardown operations. */ def close (): Unit + + /** + * Function to set the stateful processor handle that will be used to interact with the state + * store and other stateful processor related operations. + * @param handle - instance of StatefulProcessorHandle + */ + final def setStatefulProcessorHandle(handle: StatefulProcessorHandle): Unit = { Review Comment: Maybe just say `setHandle` and `getHandle` for simplicity ? `StatefulProcessor` is kind of implied here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46961][SS] Using ProcessorContext to store and retrieve handle [spark]
anishshri-db commented on code in PR #45002: URL: https://github.com/apache/spark/pull/45002#discussion_r1481969778 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala: ## @@ -27,19 +27,16 @@ import org.apache.spark.annotation.{Evolving, Experimental} */ @Experimental @Evolving -private[sql] trait StatefulProcessor[K, I, O] extends Serializable { +private[sql] abstract class StatefulProcessor[K, I, O] extends Serializable { Review Comment: Do we need to change this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org