[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514697997 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +// This doesn't directly override RDD methods as MiMa complains it. +abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + protected val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + /** Implementations can simply call this method in getPreferredLocations. */ + protected def _getPartitions: Array[Partition] = dataRDD.partitions Review comment: I don’t mean I don’t like to change, I mean this shouldn’t be blocked any longer by something which can be done as a follow-up. If you’d like to volunteer to look into the issue I’m happy to, “as a follow-up PR”. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514694190 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +// This doesn't directly override RDD methods as MiMa complains it. +abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + protected val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + /** Implementations can simply call this method in getPreferredLocations. */ + protected def _getPartitions: Array[Partition] = dataRDD.partitions Review comment: That was what I understand and I wouldn’t do it in this PR. What we need to add based on the error message is to exclude the some pattern against RDD, which I’m not 100% sure everyone would agree with. That’s what I said “additional discussion”. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514612885 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -36,10 +36,14 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ThreadUtils, Utils} /** - * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific - * version of state data, and such instances are created through a [[StateStoreProvider]]. + * Base trait for a versioned key-value store which provides read operations. Each instance of a + * `ReadStateStore` represents a specific version of state data, and such instances are created + * through a [[StateStoreProvider]]. + * + * `abort` method will be called when the task is completed - please clean up the resources in + * the method. */ -trait StateStore { +trait ReadStateStore { Review comment: Ah sorry I reminded the comment from @viirya and still think his comment makes more sense. I'll rename others to make consistent for now, but I'm also open to rename this to ReadOnlyStateStore, if both of you think it's better after reading the comment from @viirya . @xuanyuanking @gaborgsomogyi Could you please go through the previous comment below and comment how you think? https://github.com/apache/spark/pull/26935#discussion_r491747706 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514609139 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -36,10 +36,14 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ThreadUtils, Utils} /** - * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific - * version of state data, and such instances are created through a [[StateStoreProvider]]. + * Base trait for a versioned key-value store which provides read operations. Each instance of a + * `ReadStateStore` represents a specific version of state data, and such instances are created + * through a [[StateStoreProvider]]. + * + * `abort` method will be called when the task is completed - please clean up the resources in + * the method. */ -trait StateStore { +trait ReadStateStore { Review comment: OK I'll rename it. No big deal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514598660 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -89,16 +116,16 @@ trait StateStore { def commit(): Long /** - * Abort all the updates that have been made to the store. Implementations should ensure that - * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must Review comment: Oh I just indicated the order is swapped. Will fix. Btw I don't feel origin order is good anyway - I think `iterator` should have been placed before the `commit`, to make sure all read/write operations are adjacent, and commit lifecycle operations later. Can go through another MINOR PR though. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514596151 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +// This doesn't directly override RDD methods as MiMa complains it. +abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + protected val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + /** Implementations can simply call this method in getPreferredLocations. */ + protected def _getPartitions: Array[Partition] = dataRDD.partitions Review comment: I wouldn't want to touch RDD side just because of this change, which may bring unnecessary additional discussion. I don't see this as a kind of blocker for this PR. Anyone can deal with this as a follow-up, with possibility of discussion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r514594697 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -389,7 +453,19 @@ object StateStore extends Logging { storeConf: StateStoreConf, hadoopConf: Configuration): StateStore = { Review comment: You seem to miss the difference of return types between two. They were actually one method and I had to break to support different return types. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512665393 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ## @@ -197,15 +212,26 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } /** Get the state store for making updates to create a new `version` of the store. */ - override def getStore(version: Long): StateStore = synchronized { + override def getStore(version: Long): StateStore = { +val newMap = getLoadedMapForStore(version) +logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update") +new HDFSBackedStateStore(version, newMap) + } + + /** Get the state store for reading to specific `version` of the store. */ + override def getReadOnlyStore(version: Long): ReadStateStore = { Review comment: The name of class was ReadOnlyStateStore and I got reflect comment to rename the ReadStateStore. If we would like to make it be consistent I'll change this to `getReadStore`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512329977 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +// This doesn't directly override RDD methods as MiMa complains it. +abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + protected val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + /** Implementations can simply call this method in getPreferredLocations. */ + protected def _getPartitions: Array[Partition] = dataRDD.partitions + + /** + * Set the preferred location of each partition using the executor that has the related + * [[StateStoreProvider]] already loaded. + * + * Implementations can simply call this method in getPreferredLocations. + */ + protected def _getPreferredLocations(partition: Partition): Seq[String] = { Review comment: same here This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512329942 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +// This doesn't directly override RDD methods as MiMa complains it. +abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + protected val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + /** Implementations can simply call this method in getPreferredLocations. */ + protected def _getPartitions: Array[Partition] = dataRDD.partitions Review comment: As I commented in the class doc - MiMa somehow complained it. Not 100% sure why. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512329430 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -379,6 +428,21 @@ object StateStore extends Logging { @GuardedBy("loadedProviders") private var _coordRef: StateStoreCoordinatorRef = null + /** Get or create a read-only store associated with the id. */ + def getReadOnly( + storeProviderId: StateStoreProviderId, + keySchema: StructType, + valueSchema: StructType, + indexOrdinal: Option[Int], + version: Long, + storeConf: StateStoreConf, + hadoopConf: Configuration): ReadStateStore = { +require(version >= 0) Review comment: `getStateStoreProvider` doesn't relate to version, right? I wouldn't correlate something only to reduce code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512328018 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -89,16 +116,16 @@ trait StateStore { def commit(): Long /** - * Abort all the updates that have been made to the store. Implementations should ensure that - * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. */ - def abort(): Unit + override def iterator(): Iterator[UnsafeRowPair] /** - * Return an iterator containing all the key-value pairs in the StateStore. Implementations must - * ensure that updates (puts, removes) can be made while iterating over this iterator. + * Abort all the updates that have been made to the store. Implementations should ensure that + * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. */ - def iterator(): Iterator[UnsafeRowPair] + override def abort(): Unit Review comment: The pattern `prepare -> commit or abort -> (abort if commit fails)` is widely used across Spark, which both `commit` and `abort` have the responsibility of cleaning up. (`abort` needs to be careful of not impacted by double cleanup) In ReadStateStore, the pattern is simplified to `prepare -> abort`, which `abort` has the responsibility of cleaning up. I see one case the issue may come up: - `class A implements ReadStateStore` - `class B extends A implements StateStore` - Both A and B have its own resources and needs to be cleaned up. In this case, correctly cleaning up A's resource from B becomes non-trivial. B can call `A.abort` in both `B.commit` and `B.abort` to make sure A's resource is cleaned up but it is B's responsibility to not call A.abort multiple times as A would think there's no double cleanup as the simplified pattern. The answer is probably the same as before: we know the problem - we need a `close` method, but backward compatibility also matters as we should provide the default implementation of `close` which is very tricky for current implementations (`close` should rely on resource cleanup in `commit`/`abort`) and we decided not to. My feeling is that once the provider has different implementations between ReadStateStore and StateStore they would have different implementations (no inheritance between them). If it isn't quite sure and we don't want to open the possibility, then adding `close` could be probably reconsidered. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512313839 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -89,16 +116,16 @@ trait StateStore { def commit(): Long /** - * Abort all the updates that have been made to the store. Implementations should ensure that - * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage. + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. */ - def abort(): Unit + override def iterator(): Iterator[UnsafeRowPair] Review comment: Yes. Unfortunately we've figured out the requirements of `iterator` implementation is a bit different between the two. That said, someone could leverage the fact that iterator in ReadOnlyState doesn't need to care about updates. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r512312263 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,40 @@ trait StateStore { iterator() } + /** Return an iterator containing all the key-value pairs in the StateStore. */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. Review comment: `abort` is only the one which is not properly named in point of ReadStateStore. It should be `close`, but we found it tricky to respect backward compatibility with having `close`, hence we decided to leave it as it is. Please refer https://github.com/apache/spark/pull/26935#discussion_r493236101 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r507380414 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,43 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. Review comment: Yeah right. Let me split `ReadStateStore` and `StateStore`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r498784149 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +/** + * An RDD that allows computations to be executed against [[ReadOnlyStateStore]]s. It + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. + */ +class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +storeVersion: Long, +keySchema: StructType, +valueSchema: StructType, +indexOrdinal: Option[Int], +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + private val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + /** + * Set the preferred location of each partition using the executor that has the related + * [[StateStoreProvider]] already loaded. + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { +val stateStoreProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) +storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq + } + + override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { +val storeProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) + +val store = StateStore.getReadOnly( + storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion, Review comment: OK just reverted 5c70db0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r493236101 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +/** + * An RDD that allows computations to be executed against [[ReadOnlyStateStore]]s. It + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. + */ +class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +storeVersion: Long, +keySchema: StructType, +valueSchema: StructType, +indexOrdinal: Option[Int], +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + private val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + /** + * Set the preferred location of each partition using the executor that has the related + * [[StateStoreProvider]] already loaded. + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { +val stateStoreProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) +storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq + } + + override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { +val storeProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) + +val store = StateStore.getReadOnly( + storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion, Review comment: Just dealt it with 5c70db0 . `close` method throws UnsupportedException by default, and callers are catching it. The change considers backward compatibility like below: 1. If the implementation of StateStore isn't aware of `close` method, they should have implemented the resource cleanup in `commit` and `abort`, and Spark will call it to clean up resource. Spark will also call `close` which will be no-op. For read-only state store instance, they would use WrappedReadStateStore which calls both `abort` and `close` in `close` method, and `abort` should handle the resource cleanup. 2. If the implementation of StateStore is aware of `close` method and implements the method correctly, they should have migrated the resource cleanup to `close` method, and Spark will call it to clean up resource. For read-only state store instance, they may override `StateStoreProvider.getReadOnlyStore` (as HDFSBackedStateStoreProvider does) to provide its own ReadOnlyStore implementation. Or, they may still use WrappedReadStateStore which calls both `abort` and `close` in `close` method. `abort` would have probably no-op after migrating resource cleanup to `close`, and `close` should handle the resource cleanup. If this sounds too complicated, let's revert 5c70db0 and live with `abort` method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491746910 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. + * Implementation should deal with resource cleanup in both methods, but also need to guard with + * double resource cleanup. + */ +trait StateStore extends ReadOnlyStateStore { Review comment: Yes, because existing implementations only have StateStore and we have to support backward compatibility. Please share the idea if you have better one. I think this change is clearer than initial version of the PR - we now enforce ReadOnlyStateStore that write is simply impossible in the interface level. (Previously I added test to verify any writes throw exception, and I simply removed the test as it is simply not possible now.) ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. Review comment: I agree, but not breaking backward compatibility is important, as I know several "ecosystem" implementations. That's why I added "different" javadoc between twos. If I have a chance to redesign I would add close() as well, which will be called to clean up resources, regardless which methods was called (commit/abort) - so for ReadOnlyStateStore, there's no need to have abort() and only close() will be called, and for StateStore, after commit() and/or abort(), close() will be called too. ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +/** + * An RDD that allows computations to be executed against [[ReadOnlyStateStore]]s. It + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. + */ +class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +storeVersion: Long, +keySchema: StructType, +valueSchema: StructType, +indexOrdinal: Option[Int], +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + private val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + /** + * Set the preferred location of each partition using the executor that has the related + * [[StateStoreProvider]] already loaded. + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { +val stateStoreProvi
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491802750 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. Review comment: If we really want to be sure, we can add `close()` as empty implementation by default, and let Spark guarantee to call the method regardless of calling commit() / abort(). For existing implementation, we can make close() to call abort() in WrappedReadStateStore, so that abort() is called once and clean up resource. Would it make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491748504 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. + * Implementation should deal with resource cleanup in both methods, but also need to guard with + * double resource cleanup. + */ +trait StateStore extends ReadOnlyStateStore { Review comment: Ah OK the interface name looks to bring confusion. `ReadStateStore` looks OK to me. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491748211 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -29,6 +29,57 @@ import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +/** + * An RDD that allows computations to be executed against [[ReadOnlyStateStore]]s. It + * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores + * and use that as the preferred locations. + */ +class ReadOnlyStateStoreRDD[T: ClassTag, U: ClassTag]( +dataRDD: RDD[T], +storeReadFunction: (ReadOnlyStateStore, Iterator[T]) => Iterator[U], +checkpointLocation: String, +queryRunId: UUID, +operatorId: Long, +storeVersion: Long, +keySchema: StructType, +valueSchema: StructType, +indexOrdinal: Option[Int], +sessionState: SessionState, +@transient private val storeCoordinator: Option[StateStoreCoordinatorRef], +extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) { + + private val storeConf = new StateStoreConf(sessionState.conf, extraOptions) + + // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it + private val hadoopConfBroadcast = dataRDD.context.broadcast( +new SerializableConfiguration(sessionState.newHadoopConf())) + + override protected def getPartitions: Array[Partition] = dataRDD.partitions + + /** + * Set the preferred location of each partition using the executor that has the related + * [[StateStoreProvider]] already loaded. + */ + override def getPreferredLocations(partition: Partition): Seq[String] = { +val stateStoreProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) +storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq + } + + override def compute(partition: Partition, ctxt: TaskContext): Iterator[U] = { +val storeProviderId = StateStoreProviderId( + StateStoreId(checkpointLocation, operatorId, partition.index), + queryRunId) + +val store = StateStore.getReadOnly( + storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion, Review comment: The returning type is different between twos and the type of state function is also different. That's the main intention of the new changes - stateReadFunction never be able to do modification of the state as it will get ReadOnlyStateStore, whereas stateUpdateFunction can. I agree lots of code are duplicated - that's why I extracted abstract class, and as you saw, MiMa complained. I don't know why MiMa complained as I don't see any weird in previous code but I might be missing so I had to make MiMa happy for now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491747582 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. Review comment: I agree, but not breaking backward compatibility is important, as I know several "ecosystem" implementations. That's why I added "different" javadoc between twos. If I have a chance to redesign I would add close() as well, which will be called to clean up resources, regardless which methods was called (commit/abort) - so for ReadOnlyStateStore, there's no need to have abort() and only close() will be called, and for StateStore, after commit() and/or abort(), close() will be called too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491746910 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -81,6 +74,42 @@ trait StateStore { iterator() } + /** + * Return an iterator containing all the key-value pairs in the StateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ + def iterator(): Iterator[UnsafeRowPair] + + /** + * Clean up the resource. + * + * The method name is to respect backward compatibility on [[StateStore]]. + */ + def abort(): Unit +} + +/** + * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific + * version of state data, and such instances are created through a [[StateStoreProvider]]. + * + * Unlike [[ReadOnlyStateStore]], `abort` method may not be called if the `commit` method succeeds + * to commit the change. (`hasCommitted` returns `true`.) Otherwise, `abort` method will be called. + * Implementation should deal with resource cleanup in both methods, but also need to guard with + * double resource cleanup. + */ +trait StateStore extends ReadOnlyStateStore { Review comment: Yes, because existing implementations only have StateStore and we have to support backward compatibility. Please share the idea if you have better one. I think this change is clearer than initial version of the PR - we now enforce ReadOnlyStateStore that write is simply impossible in the interface level. (Previously I added test to verify any writes throw exception, and I simply removed the test as it is simply not possible now.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491332970 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ## @@ -253,7 +253,8 @@ case class StateStoreRestoreExec( stateManager.getStateValueSchema, indexOrdinal = None, sqlContext.sessionState, - Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => + Some(sqlContext.streams.stateStoreCoordinator), + readOnly = true) { case (store, iter) => Review comment: Yes. All other cases do read and write in same place. Streaming aggregation handles aggregation as `do aggregation in each partition` -> `shuffle` -> `enrich accumulated values via loading from state store` -> do aggregation -> `store new accumulated values to state store` hence two separated places for read and write. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491331963 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ## @@ -76,7 +76,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag]( store = StateStore.get( storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion, - storeConf, hadoopConfBroadcast.value.value) + storeConf, hadoopConfBroadcast.value.value, readOnly) val inputIter = dataRDD.iterator(partition, ctxt) storeUpdateFunction(store, inputIter) Review comment: Despite of the name, the function is used to R/W against StateStore, so still need to call for read-only case as well. (For sure, both should provide different functions.) As I commented earlier, code divergence should help to make it be clearer. I'll look into it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491331326 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ## @@ -109,6 +109,41 @@ trait StateStore { def hasCommitted: Boolean } +/** A versioned key-value store which is same as [[StateStore]], but write-protected. */ Review comment: read-only would be clearer. Will update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore
HeartSaVioR commented on a change in pull request #26935: URL: https://github.com/apache/spark/pull/26935#discussion_r491330987 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ## @@ -79,6 +78,23 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit // java.util.ConcurrentModificationException type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow] + class HDFSBackedReadOnlyStateStore(val version: Long, map: MapType) +extends ReadOnlyStateStore { + +override def id: StateStoreId = HDFSBackedStateStoreProvider.this.stateStoreId + +override def get(key: UnsafeRow): UnsafeRow = map.get(key) + +override def abort(): Unit = {} Review comment: abort() could be called for ReadOnlyStateStore as well, as the code path is not diverged between ReadOnlyStateStore/StateStore to minimize the code diff. But looks like it may bring more confusion - probably better to separate the code path clearly despite of more code changes. I'll look into it again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org