[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514697997



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
   I don’t mean I don’t like to change, I mean this shouldn’t be blocked 
any longer by something which can be done as a follow-up. If you’d like to 
volunteer to look into the issue I’m happy to, “as a follow-up PR”.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514694190



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
   That was what I understand and I wouldn’t do it in this PR. What we need 
to add based on the error message is to exclude the some pattern against RDD, 
which I’m not 100% sure everyone would agree with. That’s what I said 
“additional discussion”.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514612885



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -36,10 +36,14 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
- * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
- * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ * Base trait for a versioned key-value store which provides read operations. 
Each instance of a
+ * `ReadStateStore` represents a specific version of state data, and such 
instances are created
+ * through a [[StateStoreProvider]].
+ *
+ * `abort` method will be called when the task is completed - please clean up 
the resources in
+ * the method.
  */
-trait StateStore {
+trait ReadStateStore {

Review comment:
   Ah sorry I reminded the comment from @viirya and still think his comment 
makes more sense. I'll rename others to make consistent for now, but I'm also 
open to rename this to ReadOnlyStateStore, if both of you think it's better 
after reading the comment from @viirya .
   
   @xuanyuanking @gaborgsomogyi Could you please go through the previous 
comment below and comment how you think?
   https://github.com/apache/spark/pull/26935#discussion_r491747706





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514609139



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -36,10 +36,14 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
- * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
- * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ * Base trait for a versioned key-value store which provides read operations. 
Each instance of a
+ * `ReadStateStore` represents a specific version of state data, and such 
instances are created
+ * through a [[StateStoreProvider]].
+ *
+ * `abort` method will be called when the task is completed - please clean up 
the resources in
+ * the method.
  */
-trait StateStore {
+trait ReadStateStore {

Review comment:
   OK I'll rename it. No big deal.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514598660



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations 
should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid 
incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must

Review comment:
   Oh I just indicated the order is swapped. Will fix.
   
   Btw I don't feel origin order is good anyway - I think `iterator` should 
have been placed before the `commit`, to make sure all read/write operations 
are adjacent, and commit lifecycle operations later. Can go through another 
MINOR PR though.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514596151



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
   I wouldn't want to touch RDD side just because of this change, which may 
bring unnecessary additional discussion. I don't see this as a kind of blocker 
for this PR. Anyone can deal with this as a follow-up, with possibility of 
discussion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-29 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514594697



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -389,7 +453,19 @@ object StateStore extends Logging {
   storeConf: StateStoreConf,
   hadoopConf: Configuration): StateStore = {

Review comment:
   You seem to miss the difference of return types between two. They were 
actually one method and I had to break to support different return types.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-27 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512665393



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
##
@@ -197,15 +212,26 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   }
 
   /** Get the state store for making updates to create a new `version` of the 
store. */
-  override def getStore(version: Long): StateStore = synchronized {
+  override def getStore(version: Long): StateStore = {
+val newMap = getLoadedMapForStore(version)
+logInfo(s"Retrieved version $version of 
${HDFSBackedStateStoreProvider.this} for update")
+new HDFSBackedStateStore(version, newMap)
+  }
+
+  /** Get the state store for reading to specific `version` of the store. */
+  override def getReadOnlyStore(version: Long): ReadStateStore = {

Review comment:
   The name of class was ReadOnlyStateStore and I got reflect comment to 
rename the ReadStateStore. If we would like to make it be consistent I'll 
change this to `getReadStore`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512329977



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has 
the related
+   * [[StateStoreProvider]] already loaded.
+   *
+   * Implementations can simply call this method in getPreferredLocations.
+   */
+  protected def _getPreferredLocations(partition: Partition): Seq[String] = {

Review comment:
   same here





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512329942



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
   As I commented in the class doc - MiMa somehow complained it. Not 100% 
sure why.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512329430



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -379,6 +428,21 @@ object StateStore extends Logging {
   @GuardedBy("loadedProviders")
   private var _coordRef: StateStoreCoordinatorRef = null
 
+  /** Get or create a read-only store associated with the id. */
+  def getReadOnly(
+  storeProviderId: StateStoreProviderId,
+  keySchema: StructType,
+  valueSchema: StructType,
+  indexOrdinal: Option[Int],
+  version: Long,
+  storeConf: StateStoreConf,
+  hadoopConf: Configuration): ReadStateStore = {
+require(version >= 0)

Review comment:
   `getStateStoreProvider` doesn't relate to version, right? I wouldn't 
correlate something only to reduce code duplication.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512328018



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations 
should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid 
incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
*/
-  def abort(): Unit
+  override def iterator(): Iterator[UnsafeRowPair]
 
   /**
-   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
-   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   * Abort all the updates that have been made to the store. Implementations 
should ensure that
+   * no more updates (puts, removes) can be after an abort in order to avoid 
incorrect usage.
*/
-  def iterator(): Iterator[UnsafeRowPair]
+  override def abort(): Unit

Review comment:
   The pattern `prepare -> commit or abort -> (abort if commit fails)` is 
widely used across Spark, which both `commit` and `abort` have the 
responsibility of cleaning up. (`abort` needs to be careful of not impacted by 
double cleanup)
   
   In ReadStateStore, the pattern is simplified to `prepare -> abort`, which 
`abort` has the responsibility of cleaning up.
   
   I see one case the issue may come up:
   
   - `class A implements ReadStateStore`
   - `class B extends A implements StateStore`
   - Both A and B have its own resources and needs to be cleaned up.
   
   In this case, correctly cleaning up A's resource from B becomes non-trivial. 
B can call `A.abort` in both `B.commit` and `B.abort` to make sure A's resource 
is cleaned up but it is B's responsibility to not call A.abort multiple times 
as A would think there's no double cleanup as the simplified pattern.
   
   The answer is probably the same as before: we know the problem - we need a 
`close` method, but backward compatibility also matters as we should provide 
the default implementation of `close` which is very tricky for current 
implementations (`close` should rely on resource cleanup in `commit`/`abort`) 
and we decided not to.
   
   My feeling is that once the provider has different implementations between 
ReadStateStore and StateStore they would have different implementations (no 
inheritance between them). If it isn't  quite sure and we don't want to open 
the possibility, then adding `close` could be probably reconsidered.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512313839



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations 
should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid 
incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
*/
-  def abort(): Unit
+  override def iterator(): Iterator[UnsafeRowPair]

Review comment:
   Yes. Unfortunately we've figured out the requirements of `iterator` 
implementation is a bit different between the two. That said, someone could 
leverage the fact that iterator in ReadOnlyState doesn't need to care about 
updates.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-26 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r512312263



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,40 @@ trait StateStore {
 iterator()
   }
 
+  /** Return an iterator containing all the key-value pairs in the StateStore. 
*/
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].

Review comment:
   `abort` is only the one which is not properly named in point of 
ReadStateStore. It should be `close`, but we found it tricky to respect 
backward compatibility with having `close`, hence we decided to leave it as it 
is. 
   
   Please refer https://github.com/apache/spark/pull/26935#discussion_r493236101





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-18 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r507380414



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,43 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.

Review comment:
   Yeah right. Let me split `ReadStateStore` and `StateStore`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-10-02 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r498784149



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+/**
+ * An RDD that allows computations to be executed against 
[[ReadOnlyStateStore]]s. It
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state 
stores
+ * and use that as the preferred locations.
+ */
+class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+storeVersion: Long,
+keySchema: StructType,
+valueSchema: StructType,
+indexOrdinal: Option[Int],
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  private val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  override protected def getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has 
the related
+   * [[StateStoreProvider]] already loaded.
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val stateStoreProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq
+  }
+
+  override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = 
{
+val storeProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+
+val store = StateStore.getReadOnly(
+  storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,

Review comment:
   OK just reverted 5c70db0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-22 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r493236101



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+/**
+ * An RDD that allows computations to be executed against 
[[ReadOnlyStateStore]]s. It
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state 
stores
+ * and use that as the preferred locations.
+ */
+class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+storeVersion: Long,
+keySchema: StructType,
+valueSchema: StructType,
+indexOrdinal: Option[Int],
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  private val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  override protected def getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has 
the related
+   * [[StateStoreProvider]] already loaded.
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val stateStoreProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq
+  }
+
+  override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = 
{
+val storeProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+
+val store = StateStore.getReadOnly(
+  storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,

Review comment:
   Just dealt it with 5c70db0 .
   
`close` method throws UnsupportedException by default, and callers are 
catching it. The change considers backward compatibility like below:
   
   1. 
   If the implementation of StateStore isn't aware of `close` method, they 
should have implemented the resource cleanup in `commit` and `abort`, and Spark 
will call it to clean up resource. Spark will also call `close` which will be 
no-op. 
   
   For read-only state store instance, they would use WrappedReadStateStore 
which calls both `abort` and `close` in `close` method, and `abort` should 
handle the resource cleanup.
   
   2. 
   If the implementation of StateStore is aware of `close` method and 
implements the method correctly, they should have migrated the resource cleanup 
to `close` method, and Spark will call it to clean up resource. 
   
   For read-only state store instance, they may override 
`StateStoreProvider.getReadOnlyStore` (as HDFSBackedStateStoreProvider does) to 
provide its own ReadOnlyStore implementation. Or, they may still use 
WrappedReadStateStore which calls both `abort` and `close` in `close` method. 
`abort` would have probably no-op after migrating resource cleanup to `close`, 
and `close` should handle the resource cleanup.
   
   If this sounds too complicated, let's revert 5c70db0 and live with `abort` 
method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-21 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491746910



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.
+ * Implementation should deal with resource cleanup in both methods, but also 
need to guard with
+ * double resource cleanup.
+ */
+trait StateStore extends ReadOnlyStateStore {

Review comment:
   Yes, because existing implementations only have StateStore and we have 
to support backward compatibility. Please share the idea if you have better one.
   
   I think this change is clearer than initial version of the PR - we now 
enforce ReadOnlyStateStore that write is simply impossible in the interface 
level. (Previously I added test to verify any writes throw exception, and I 
simply removed the test as it is simply not possible now.)

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.

Review comment:
   I agree, but not breaking backward compatibility is important, as I know 
several "ecosystem" implementations. That's why I added "different" javadoc 
between twos.
   
   If I have a chance to redesign I would add close() as well, which will be 
called to clean up resources, regardless which methods was called 
(commit/abort) - so for ReadOnlyStateStore, there's no need to have abort() and 
only close() will be called, and for StateStore, after commit() and/or abort(), 
close() will be called too.

##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+/**
+ * An RDD that allows computations to be executed against 
[[ReadOnlyStateStore]]s. It
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state 
stores
+ * and use that as the preferred locations.
+ */
+class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+storeVersion: Long,
+keySchema: StructType,
+valueSchema: StructType,
+indexOrdinal: Option[Int],
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  private val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  override protected def getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has 
the related
+   * [[StateStoreProvider]] already loaded.
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val stateStoreProvi

[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-20 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491802750



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.

Review comment:
   If we really want to be sure, we can add `close()` as empty 
implementation by default, and let Spark guarantee to call the method 
regardless of calling commit() / abort(). For existing implementation, we can 
make close() to call abort() in WrappedReadStateStore, so that abort() is 
called once and clean up resource. Would it make sense to you?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-20 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491748504



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.
+ * Implementation should deal with resource cleanup in both methods, but also 
need to guard with
+ * double resource cleanup.
+ */
+trait StateStore extends ReadOnlyStateStore {

Review comment:
   Ah OK the interface name looks to bring confusion. `ReadStateStore` 
looks OK to me. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-20 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491748211



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+/**
+ * An RDD that allows computations to be executed against 
[[ReadOnlyStateStore]]s. It
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state 
stores
+ * and use that as the preferred locations.
+ */
+class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag](
+dataRDD: RDD[T],
+storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U],
+checkpointLocation: String,
+queryRunId: UUID,
+operatorId: Long,
+storeVersion: Long,
+keySchema: StructType,
+valueSchema: StructType,
+indexOrdinal: Option[Int],
+sessionState: SessionState,
+@transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  private val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val hadoopConfBroadcast = dataRDD.context.broadcast(
+new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  override protected def getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has 
the related
+   * [[StateStoreProvider]] already loaded.
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+val stateStoreProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq
+  }
+
+  override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = 
{
+val storeProviderId = StateStoreProviderId(
+  StateStoreId(checkpointLocation, operatorId, partition.index),
+  queryRunId)
+
+val store = StateStore.getReadOnly(
+  storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,

Review comment:
   The returning type is different between twos and the type of state 
function is also different. That's the main intention of the new changes - 
stateReadFunction never be able to do modification of the state as it will get 
ReadOnlyStateStore, whereas stateUpdateFunction can.
   
   I agree lots of code are duplicated - that's why I extracted abstract class, 
and as you saw, MiMa complained. I don't know why MiMa complained as I don't 
see any weird in previous code but I might be missing so I had to make MiMa 
happy for now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-20 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491747582



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.

Review comment:
   I agree, but not breaking backward compatibility is important, as I know 
several "ecosystem" implementations. That's why I added "different" javadoc 
between twos.
   
   If I have a chance to redesign I would add close() as well, which will be 
called to clean up resources, regardless which methods was called 
(commit/abort) - so for ReadOnlyStateStore, there's no need to have abort() and 
only close() will be called, and for StateStore, after commit() and/or abort(), 
close() will be called too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-20 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491746910



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -81,6 +74,42 @@ trait StateStore {
 iterator()
   }
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore. 
Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this 
iterator.
+   */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].
+   */
+  def abort(): Unit
+}
+
+/**
+ * Base trait for a versioned key-value store. Each instance of a `StateStore` 
represents a specific
+ * version of state data, and such instances are created through a 
[[StateStoreProvider]].
+ *
+ * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the 
`commit` method succeeds
+ * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` 
method will be called.
+ * Implementation should deal with resource cleanup in both methods, but also 
need to guard with
+ * double resource cleanup.
+ */
+trait StateStore extends ReadOnlyStateStore {

Review comment:
   Yes, because existing implementations only have StateStore and we have 
to support backward compatibility. Please share the idea if you have better one.
   
   I think this change is clearer than initial version of the PR - we now 
enforce ReadOnlyStateStore that write is simply impossible in the interface 
level. (Previously I added test to verify any writes throw exception, and I 
simply removed the test as it is simply not possible now.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-19 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491332970



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##
@@ -253,7 +253,8 @@ case class StateStoreRestoreExec(
   stateManager.getStateValueSchema,
   indexOrdinal = None,
   sqlContext.sessionState,
-  Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
+  Some(sqlContext.streams.stateStoreCoordinator),
+  readOnly = true) { case (store, iter) =>

Review comment:
   Yes. All other cases do read and write in same place. Streaming 
aggregation handles aggregation as `do aggregation in each partition` -> 
`shuffle` -> `enrich accumulated values via loading from state store` -> do 
aggregation -> `store new accumulated values to state store` hence two 
separated places for read and write.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-19 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491331963



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##
@@ -76,7 +76,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
 
 store = StateStore.get(
   storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,
-  storeConf, hadoopConfBroadcast.value.value)
+  storeConf, hadoopConfBroadcast.value.value, readOnly)
 val inputIter = dataRDD.iterator(partition, ctxt)
 storeUpdateFunction(store, inputIter)

Review comment:
   Despite of the name, the function is used to R/W against StateStore, so 
still need to call for read-only case as well. (For sure, both should provide 
different functions.)
   
   As I commented earlier, code divergence should help to make it be clearer. 
I'll look into it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-19 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491331326



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##
@@ -109,6 +109,41 @@ trait StateStore {
   def hasCommitted: Boolean
 }
 
+/** A versioned key-value store which is same as [[StateStore]], but 
write-protected. */

Review comment:
   read-only would be clearer. Will update.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

2020-09-19 Thread GitBox


HeartSaVioR commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491330987



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
##
@@ -79,6 +78,23 @@ private[state] class HDFSBackedStateStoreProvider extends 
StateStoreProvider wit
   //   java.util.ConcurrentModificationException
   type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
 
+  class HDFSBackedReadOnlyStateStore(val version: Long, map: MapType)
+extends ReadOnlyStateStore {
+
+override def id: StateStoreId = 
HDFSBackedStateStoreProvider.this.stateStoreId
+
+override def get(key: UnsafeRow): UnsafeRow = map.get(key)
+
+override def abort(): Unit = {}

Review comment:
   abort() could be called for ReadOnlyStateStore as well, as the code path 
is not diverged between ReadOnlyStateStore/StateStore to minimize the code 
diff. But looks like it may bring more confusion - probably better to separate 
the code path clearly despite of more code changes. I'll look into it again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org