Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]
mridulm commented on PR #43627: URL: https://github.com/apache/spark/pull/43627#issuecomment-1931474172 @dongjoon-hyun, tt is `@DeveloperApi` from point of view of usage - `SparkEnv` is not expected to be created by users, as some of the constructor parameters are not externally visible (`RpcEnv`, for example, cannot be created as it is `private[spark]`). There have been changes to its constructor in the past as well, after it was marked `@DeveloperApi` - though to be fair, these were a while back. In general, I am conflicted about trying to preserve compatibility for things which are clearly private to spark - it inhibits the ability for the project to evolve: especially around major version boundaries (though we do have a lot of these instances where we try to maintain compatibility). Given how long `SparkEnv` has been around, I can see valid case being made for adding a constructor which preserves earlier signature. Thiughts @tgravescs ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
anishshri-db commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480989833 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +163,114 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +val broadcastedHadoopConf = Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]
zhengruifeng opened a new pull request, #45056: URL: https://github.com/apache/spark/pull/45056 ### What changes were proposed in this pull request? Make `ProtoUtils.abbreviate` support repeated fields ### Why are the changes needed? existing implementation does not work for repeated fields (strings/messages) we don't have `repeated bytes` in Spark Connect for now, so let it alone ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? added UTs ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]
cloud-fan commented on PR #45054: URL: https://github.com/apache/spark/pull/45054#issuecomment-1931369315 cc @yaooqinn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]
dongjoon-hyun opened a new pull request, #45055: URL: https://github.com/apache/spark/pull/45055 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480916745 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +163,114 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +val broadcastedHadoopConf = + new SerializableConfiguration(session.sessionState.newHadoopConf()) +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once Review Comment: nit: remove the comment to be in sync -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480916290 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +163,114 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +val broadcastedHadoopConf = Review Comment: This is not needed for streaming - let's do this just before L184. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
ericm-db commented on PR #44884: URL: https://github.com/apache/spark/pull/44884#issuecomment-1931304594 cc @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
ericm-db commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480908781 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
cloud-fan commented on code in PR #45036: URL: https://github.com/apache/spark/pull/45036#discussion_r1480904118 ## core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala: ## @@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers { assert(pos1 == pos2) } } + + test("SPARK-45599: 0.0 and -0.0 are equal but not the same") { Review Comment: This is a bit tricky and it's better if we can find a reference system that defines this semantic. In Spark, `0.0 == -0.0`, and in GROUP BY, 0.0 and -0.0 are considered to be in the same group and normalized to 0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]
cloud-fan commented on code in PR #45054: URL: https://github.com/apache/spark/pull/45054#discussion_r1480902456 ## sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala: ## @@ -1630,20 +1630,37 @@ class CachedTableSuite extends QueryTest with SQLTestUtils SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { - var finalPlan = "" + var finalPlan: SparkPlanInfo = null val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { event match { -case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc, sparkPlanInfo) => +case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) => if (sparkPlanInfo.simpleString.startsWith( "AdaptiveSparkPlan isFinalPlan=true")) { -finalPlan = physicalPlanDesc +finalPlan = sparkPlanInfo } case _ => // ignore other events } } } + def findNodeInSparkPlanInfo(root: SparkPlanInfo, cond: SparkPlanInfo => Boolean): + Option[SparkPlanInfo] = { +if (cond(root)) { + Some(root) +} else { + root.children.flatMap(findNodeInSparkPlanInfo(_, cond)).headOption +} + } + + def cachedFinalStageCoalesced(sparkPlanInfo: SparkPlanInfo): Boolean = { +val inMemoryScanNode = findNodeInSparkPlanInfo(sparkPlanInfo, + _.nodeName.contains("TableCacheQueryStage")) +val resultNode = findNodeInSparkPlanInfo(inMemoryScanNode.get, + _.nodeName.contains("ResultQueryStage")) Review Comment: there is no `ResultQueryStage` in Spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1480902173 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,97 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): Review Comment: Small hint: majority of methods we are introducing in Python streaming data source have corresponding methods in Scala API. You are OK to copy the same content, with modification to reflect the diff on contract if any. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]
cloud-fan commented on code in PR #45054: URL: https://github.com/apache/spark/pull/45054#discussion_r1480901308 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -1561,11 +1561,11 @@ object SQLConf { .doc("Whether to forcibly enable some optimization rules that can change the output " + "partitioning of a cached query when executing it for caching. If it is set to true, " + "queries may need an extra shuffle to read the cached data. This configuration is " + -"enabled by default. The optimization rules enabled by this configuration " + -s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and ${AUTO_BUCKETED_SCAN_ENABLED.key}.") +"enabled by default. The optimization rule enabled by this configuration " + Review Comment: ```suggestion "disabled by default. The optimization rule enabled by this configuration " + ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]
liuzqt commented on PR #45054: URL: https://github.com/apache/spark/pull/45054#issuecomment-1931275542 @cloud-fan @maryannxue Please help review this change, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]
liuzqt opened a new pull request, #45054: URL: https://github.com/apache/spark/pull/45054 ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/43435 and https://github.com/apache/spark/pull/43760 are fixing a correctness issue which will be triggered when AQE applied on cached query plan, specifically, when AQE coalescing the final result stage of the cached plan. The current semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` ([source code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411)): when true, we enable AQE, but disable coalescing final stage (default) when false, we disable AQE But let’s revisit the semantic of this config: actually for caller the only thing that matters is whether we change the output partitioning of the cached plan. And we should only try to apply AQE if possible. Thus we want to modify the semantic of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning when true, we enable AQE and allow coalescing final: this might lead to perf regression, because it introduce extra shuffle when false, we enable AQE, but disable coalescing final stage. (this is actually the `true` semantic of old behavior) Also, to keep the default behavior unchanged, we might want to flip the default value of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to `false` ### Why are the changes needed? To allow AQE coalesce final stage in SQL cached plan. Also make the semantic of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` more reasonable. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Updated UTs. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]
HeartSaVioR commented on PR #45036: URL: https://github.com/apache/spark/pull/45036#issuecomment-1931270883 I have no expertise on this area so will leave the decision on merging the change to which version(s), and whether we want to safeguard or not, to introduce an escape-hatch on behavioral change. I can hold cutting the tag of RC1 for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1480879427 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { Review Comment: Can we please document the protocol for this runner <-> driver? This is a new worker specialized to Python streaming source - let's not force code readers to figure out by themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]
cloud-fan closed pull request #44514: [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table URL: https://github.com/apache/spark/pull/44514 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]
cloud-fan commented on PR #44514: URL: https://github.com/apache/spark/pull/44514#issuecomment-1931233168 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46975][PS] Move `to_{hdf, feather, stata}` to the fallback list [spark]
HyukjinKwon commented on code in PR #45026: URL: https://github.com/apache/spark/pull/45026#discussion_r1480858948 ## python/pyspark/pandas/frame.py: ## @@ -2648,123 +2648,6 @@ def to_latex( psdf._to_internal_pandas(), self.to_latex, pd.DataFrame.to_latex, args ) -def to_feather( -self, -path: Union[str, IO[str]], -**kwargs: Any, -) -> None: -""" -Write a DataFrame to the binary Feather format. - -.. note:: This method should only be used if the resulting DataFrame is expected - to be small, as all the data is loaded into the driver's memory. - -.. versionadded:: 4.0.0 - -Parameters --- -path : str, path object, file-like object -String, path object (implementing ``os.PathLike[str]``), or file-like -object implementing a binary ``write()`` function. -**kwargs : -Additional keywords passed to :func:`pyarrow.feather.write_feather`. -This includes the `compression`, `compression_level`, `chunksize` -and `version` keywords. - -Examples - ->>> df = ps.DataFrame([[1, 2, 3], [4, 5, 6]]) ->>> df.to_feather("file.feather") # doctest: +SKIP -""" -# Make sure locals() call is at the top of the function so we don't capture local variables. -args = locals() - -return validate_arguments_and_invoke_function( -self._to_internal_pandas(), self.to_feather, pd.DataFrame.to_feather, args -) - -def to_stata( -self, -path: Union[str, IO[str]], -*, -convert_dates: Optional[Dict] = None, -write_index: bool = True, -byteorder: Optional[str] = None, -time_stamp: Optional[datetime.datetime] = None, -data_label: Optional[str] = None, -variable_labels: Optional[Dict] = None, -version: Optional[int] = 114, -convert_strl: Optional[Sequence[Name]] = None, -compression: str = "infer", -storage_options: Optional[str] = None, -value_labels: Optional[Dict] = None, -) -> None: -""" -Export DataFrame object to Stata dta format. - -.. note:: This method should only be used if the resulting DataFrame is expected - to be small, as all the data is loaded into the driver's memory. - -.. versionadded:: 4.0.0 - -Parameters --- -path : str, path object, or buffer -String, path object (implementing ``os.PathLike[str]``), or file-like -object implementing a binary ``write()`` function. -convert_dates : dict -Dictionary mapping columns containing datetime types to stata -internal format to use when writing the dates. Options are 'tc', -'td', 'tm', 'tw', 'th', 'tq', 'ty'. Column can be either an integer -or a name. Datetime columns that do not have a conversion type -specified will be converted to 'tc'. Raises NotImplementedError if -a datetime column has timezone information. -write_index : bool -Write the index to Stata dataset. -byteorder : str -Can be ">", "<", "little", or "big". default is `sys.byteorder`. -time_stamp : datetime -A datetime to use as file creation date. Default is the current -time. -data_label : str, optional -A label for the data set. Must be 80 characters or smaller. -variable_labels : dict -Dictionary containing columns as keys and variable labels as -values. Each label must be 80 characters or smaller. -version : {{114, 117, 118, 119, None}}, default 114 -Version to use in the output dta file. Set to None to let pandas -decide between 118 or 119 formats depending on the number of -columns in the frame. Version 114 can be read by Stata 10 and -later. Version 117 can be read by Stata 13 or later. Version 118 -is supported in Stata 14 and later. Version 119 is supported in -Stata 15 and later. Version 114 limits string variables to 244 -characters or fewer while versions 117 and later allow strings -with lengths up to 2,000,000 characters. Versions 118 and 119 -support Unicode characters, and version 119 supports more than -32,767 variables. -convert_strl : list, optional -List of column names to convert to string columns to Stata StrL -format. Only available if version is 117. Storing strings in the -StrL format can produce smaller dta files if strings have more than -8 characters and values are repeated. -value_labels : dict of dicts -Dictionary containing columns as keys and
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: Please see the question again - my question was "will this be evaluate once per executor **after serde**?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: Please see the question again - my question was "will this be evaluate once per executor **_after serde_**?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
anishshri-db commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480850291 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: Ah ok - gotcha, thx ! Yea agreed - if its not evaluating once, might as well remove the `lazy` portion. Maybe we should just remove it anyway - will become easier to read I feel -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
HeartSaVioR commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: Please see the question again - my question was "will this be evaluate once per executor?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]
MaxNevermind commented on PR #44636: URL: https://github.com/apache/spark/pull/44636#issuecomment-1931178064 Pushed another commit. One issue was resolved. @viirya please check out a solution for the last remaining proposed issue above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]
MaxNevermind commented on code in PR #44636: URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala: ## @@ -166,6 +197,25 @@ class FileStreamSource( // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (newFiles.take(files.maxFiles()), null) + case files: ReadMaxBytes if !sourceOptions.latestFirst => +// we can cache and reuse remaining fetched list of files in further batches +val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = + takeFilesUntilMax(newFiles, files.maxBytes()) +if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) { Review Comment: @viirya I've converted BigInt to Double and now it is a comparison of two Doubles let me know if that is acceptable, we could also do: ``` if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) { ``` BigInt on the left would be converted to BigDecimal implicitly by compiler ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala: ## @@ -166,6 +197,25 @@ class FileStreamSource( // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (newFiles.take(files.maxFiles()), null) + case files: ReadMaxBytes if !sourceOptions.latestFirst => +// we can cache and reuse remaining fetched list of files in further batches +val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = + takeFilesUntilMax(newFiles, files.maxBytes()) +if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) { Review Comment: @viirya I've converted BigInt to Double and now it is a comparison of two Doubles let me know if that is acceptable, we could also do: ``` if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) { ``` BigInt on the left would be converted to BigDecimal implicitly by a compiler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]
MaxNevermind commented on code in PR #44636: URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala: ## @@ -166,6 +197,25 @@ class FileStreamSource( // implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch (newFiles.take(files.maxFiles()), null) + case files: ReadMaxBytes if !sourceOptions.latestFirst => +// we can cache and reuse remaining fetched list of files in further batches +val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) = + takeFilesUntilMax(newFiles, files.maxBytes()) +if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) { Review Comment: @viirya I've converted BigInt to Double and now it is a comparison of two Doubles let me know if that is acceptable, we could also do: ``` if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) { ``` BigInt on the left would be converted to BigDecimal implicitly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
wbo4958 commented on code in PR #44852: URL: https://github.com/apache/spark/pull/44852#discussion_r1480821808 ## python/pyspark/sql/pandas/map_ops.py: ## @@ -15,9 +15,14 @@ # limitations under the License. # import sys -from typing import Union, TYPE_CHECKING +from typing import Union, TYPE_CHECKING, Optional + +from py4j.java_gateway import JavaObject + +from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests Review Comment: Thx ## python/pyspark/sql/pandas/map_ops.py: ## @@ -175,6 +196,11 @@ def mapInArrow( .. versionadded: 3.5.0 +profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile +to be used for mapInPandas. Review Comment: My bad, Thx very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]
LuciferYang commented on PR #45045: URL: https://github.com/apache/spark/pull/45045#issuecomment-1931141290 All test passed. Merged into master for Spark 4.0. Thanks @zhengruifeng @dongjoon-hyun @xinrong-meng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]
LuciferYang closed pull request #45045: [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation URL: https://github.com/apache/spark/pull/45045 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46975][PS] Move `to_{hdf, feather, stata}` to the fallback list [spark]
zhengruifeng commented on code in PR #45026: URL: https://github.com/apache/spark/pull/45026#discussion_r1480807590 ## python/pyspark/pandas/frame.py: ## @@ -2648,123 +2648,6 @@ def to_latex( psdf._to_internal_pandas(), self.to_latex, pd.DataFrame.to_latex, args ) -def to_feather( -self, -path: Union[str, IO[str]], -**kwargs: Any, -) -> None: -""" -Write a DataFrame to the binary Feather format. - -.. note:: This method should only be used if the resulting DataFrame is expected - to be small, as all the data is loaded into the driver's memory. - -.. versionadded:: 4.0.0 - -Parameters --- -path : str, path object, file-like object -String, path object (implementing ``os.PathLike[str]``), or file-like -object implementing a binary ``write()`` function. -**kwargs : -Additional keywords passed to :func:`pyarrow.feather.write_feather`. -This includes the `compression`, `compression_level`, `chunksize` -and `version` keywords. - -Examples - ->>> df = ps.DataFrame([[1, 2, 3], [4, 5, 6]]) ->>> df.to_feather("file.feather") # doctest: +SKIP -""" -# Make sure locals() call is at the top of the function so we don't capture local variables. -args = locals() - -return validate_arguments_and_invoke_function( -self._to_internal_pandas(), self.to_feather, pd.DataFrame.to_feather, args -) - -def to_stata( -self, -path: Union[str, IO[str]], -*, -convert_dates: Optional[Dict] = None, -write_index: bool = True, -byteorder: Optional[str] = None, -time_stamp: Optional[datetime.datetime] = None, -data_label: Optional[str] = None, -variable_labels: Optional[Dict] = None, -version: Optional[int] = 114, -convert_strl: Optional[Sequence[Name]] = None, -compression: str = "infer", -storage_options: Optional[str] = None, -value_labels: Optional[Dict] = None, -) -> None: -""" -Export DataFrame object to Stata dta format. - -.. note:: This method should only be used if the resulting DataFrame is expected - to be small, as all the data is loaded into the driver's memory. - -.. versionadded:: 4.0.0 - -Parameters --- -path : str, path object, or buffer -String, path object (implementing ``os.PathLike[str]``), or file-like -object implementing a binary ``write()`` function. -convert_dates : dict -Dictionary mapping columns containing datetime types to stata -internal format to use when writing the dates. Options are 'tc', -'td', 'tm', 'tw', 'th', 'tq', 'ty'. Column can be either an integer -or a name. Datetime columns that do not have a conversion type -specified will be converted to 'tc'. Raises NotImplementedError if -a datetime column has timezone information. -write_index : bool -Write the index to Stata dataset. -byteorder : str -Can be ">", "<", "little", or "big". default is `sys.byteorder`. -time_stamp : datetime -A datetime to use as file creation date. Default is the current -time. -data_label : str, optional -A label for the data set. Must be 80 characters or smaller. -variable_labels : dict -Dictionary containing columns as keys and variable labels as -values. Each label must be 80 characters or smaller. -version : {{114, 117, 118, 119, None}}, default 114 -Version to use in the output dta file. Set to None to let pandas -decide between 118 or 119 formats depending on the number of -columns in the frame. Version 114 can be read by Stata 10 and -later. Version 117 can be read by Stata 13 or later. Version 118 -is supported in Stata 14 and later. Version 119 is supported in -Stata 15 and later. Version 114 limits string variables to 244 -characters or fewer while versions 117 and later allow strings -with lengths up to 2,000,000 characters. Versions 118 and 119 -support Unicode characters, and version 119 supports more than -32,767 variables. -convert_strl : list, optional -List of column names to convert to string columns to Stata StrL -format. Only available if version is 117. Storing strings in the -StrL format can produce smaller dta files if strings have more than -8 characters and values are repeated. -value_labels : dict of dicts -Dictionary containing columns as keys and
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
srowen commented on code in PR #44690: URL: https://github.com/apache/spark/pull/44690#discussion_r1480801713 ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: No. The point of Big decimal is that you never have to use double or float. This isn't actually doing math with BD -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]
wbo4958 commented on code in PR #44690: URL: https://github.com/apache/spark/pull/44690#discussion_r1480798101 ## core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala: ## @@ -20,6 +20,49 @@ package org.apache.spark.resource import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE + +private[spark] object ResourceAmountUtils { + /** + * Using "double" to do the resource calculation may encounter a problem of precision loss. Eg + * + * scala val taskAmount = 1.0 / 9 + * taskAmount: Double = 0. + * + * scala var total = 1.0 + * total: Double = 1.0 + * + * scala for (i - 1 to 9 ) { + * | if (total = taskAmount) { + * | total -= taskAmount + * | println(s"assign $taskAmount for task $i, total left: $total") + * | } else { + * | println(s"ERROR Can't assign $taskAmount for task $i, total left: $total") + * | } + * | } + * assign 0. for task 1, total left: 0. + * assign 0. for task 2, total left: 0. + * assign 0. for task 3, total left: 0.6665 + * assign 0. for task 4, total left: 0.5554 + * assign 0. for task 5, total left: 0.44425 + * assign 0. for task 6, total left: 0.33315 + * assign 0. for task 7, total left: 0.22204 + * assign 0. for task 8, total left: 0.11094 + * ERROR Can't assign 0. for task 9, total left: 0.11094 + * + * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid this limitation. + * Double can display up to 16 decimal places, so we set the factor to + * 10, 000, 000, 000, 000, 000L. + */ + final val ONE_ENTIRE_RESOURCE: Long = 1L Review Comment: Hi @srowen, The **using Long** way is quite the same with the BigDecimal. The default scale of BigDecimal is 16, so this PR chooses (ONE_ENTIRE_RESOURCE = 1E16.toLong) ``` scala scala> val ONE_ENTIRE_RESOURCE: Long = 1E16.toLong | val taskAmount = 0.3334 | | val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong | | val bigDec = BigDecimal(taskAmount).toDouble val ONE_ENTIRE_RESOURCE: Long = 1 val taskAmount: Double = 0.3334 val usingLong: Long = 3334 val bigDec: Double = 0.3334 ``` So if we need to ensure the input is small enough (<1/n) and we can set the scale to be like 14 for BigDecimal, and similarly, to keep align with BigDecimal, we can set `ONE_ENTIRE_RESOURCE = 1E14.toLong` ``` scala scala> import scala.math.BigDecimal.RoundingMode | | val ONE_ENTIRE_RESOURCE: Long = 1E14.toLong | val taskAmount = 0.3334 | | val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong | | val bigDec = BigDecimal(taskAmount).setScale(14, RoundingMode.DOWN).toDouble import scala.math.BigDecimal.RoundingMode val ONE_ENTIRE_RESOURCE: Long = 100 val taskAmount: Double = 0.3334 val usingLong: Long = 33 val bigDec: Double = 0.33 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
HyukjinKwon commented on code in PR #44852: URL: https://github.com/apache/spark/pull/44852#discussion_r1480797674 ## python/pyspark/sql/pandas/map_ops.py: ## @@ -15,9 +15,14 @@ # limitations under the License. # import sys -from typing import Union, TYPE_CHECKING +from typing import Union, TYPE_CHECKING, Optional + +from py4j.java_gateway import JavaObject + +from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
HyukjinKwon commented on code in PR #44852: URL: https://github.com/apache/spark/pull/44852#discussion_r1480797398 ## python/pyspark/sql/tests/test_resources.py: ## @@ -0,0 +1,104 @@ +# Review Comment: This has to be added into `dev/sparktestsupport/modules.py` ## python/pyspark/sql/tests/test_resources.py: ## @@ -0,0 +1,104 @@ +# Review Comment: This has to be added into `dev/sparktestsupport/modules.py` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]
HyukjinKwon commented on code in PR #44852: URL: https://github.com/apache/spark/pull/44852#discussion_r1480797018 ## python/pyspark/sql/pandas/map_ops.py: ## @@ -175,6 +196,11 @@ def mapInArrow( .. versionadded: 3.5.0 +profile : :class:`pyspark.resource.ResourceProfile`. The optional ResourceProfile +to be used for mapInPandas. Review Comment: ```suggestion to be used for mapInArrow. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
beliefer commented on code in PR #43473: URL: https://github.com/apache/spark/pull/43473#discussion_r1480770423 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -115,6 +115,49 @@ class SparkConnectPlanner( private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) + // Some relation transform need to create Dataset, then get the logical plan from the Dataset. + // This method used to reuse the Dataset instead to discard it. + def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = { Review Comment: > TBH I want to move away from constructing Datasets wholesale. In many cases there is no real need, and it is also expensive to do. Yes. the current implementation create man duplicate datasets and then discard them. We should reuse these datasets and reduce the overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
beliefer commented on code in PR #43473: URL: https://github.com/apache/spark/pull/43473#discussion_r1480770588 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -115,6 +115,49 @@ class SparkConnectPlanner( private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) + // Some relation transform need to create Dataset, then get the logical plan from the Dataset. + // This method used to reuse the Dataset instead to discard it. + def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = { Review Comment: > Is this something you want to work on? Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
beliefer commented on code in PR #43473: URL: https://github.com/apache/spark/pull/43473#discussion_r1480770423 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -115,6 +115,49 @@ class SparkConnectPlanner( private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) + // Some relation transform need to create Dataset, then get the logical plan from the Dataset. + // This method used to reuse the Dataset instead to discard it. + def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = { Review Comment: > TBH I want to move away from constructing Datasets wholesale. In many cases there is no real need, and it is also expensive to do. Yes. the current implementation create man duplicate datasets. We should reuse these datasets and reduce the overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46963] Verify AQE is not enabled for Structured Streaming [spark]
bogao007 commented on PR #45005: URL: https://github.com/apache/spark/pull/45005#issuecomment-1931071420 Closing this PR, will not do the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46963] Verify AQE is not enabled for Structured Streaming [spark]
bogao007 closed pull request #45005: [SPARK-46963] Verify AQE is not enabled for Structured Streaming URL: https://github.com/apache/spark/pull/45005 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]
HyukjinKwon closed pull request #45042: [SPARK-46984][PYTHON] Remove pyspark.copy_func URL: https://github.com/apache/spark/pull/45042 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]
HyukjinKwon commented on PR #45042: URL: https://github.com/apache/spark/pull/45042#issuecomment-1931067075 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
sunchao commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480751365 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Yes, #43627 already requires a API change, so this just built on top of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
dongjoon-hyun commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Ack. I also pinged on #43627 . If this is broken already at Spark 4.0.0, we don't need to care much. - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
dongjoon-hyun commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Ack. I also pinged on #43627 . If this was broken already at Spark 4.0.0, we don't need to care much. - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480673515 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ## @@ -222,6 +232,31 @@ class StateStoreChangelogWriterV2( size += 1 } + override def merge(key: Array[Byte], value: Array[Byte]): Unit = { Review Comment: Thanks for bringing this up. I have a follow up question here - should we also support default column family in changelog v2? Is there a reason to restrict it to only column families? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480674682 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala: ## @@ -57,7 +57,10 @@ package object state { sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, -extraOptions: Map[String, String] = Map.empty)( +extraOptions: Map[String, String] = Map.empty, +// TODO: refactor using the boolean parameter for choosing stateful encoder properties Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
sunchao commented on PR #45052: URL: https://github.com/apache/spark/pull/45052#issuecomment-1930961472 I'll add some tests later. Marking as a draft for now to run through all existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
sunchao commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480670318 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Hmm actually it might be a bit difficult since we are changing `memoryManager` from a `val` to a method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480670123 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -215,7 +253,13 @@ private[sql] class RocksDBStateStoreProvider (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " + "greater than the number of columns for prefix key!") -this.encoder = RocksDBStateEncoder.getEncoder(keySchema, valueSchema, numColsPrefixKey) +if (useMultipleValuesPerKey) { Review Comment: Yes, we have the option to get rid of this check post the key/value encoder refactoring. We can keep it if we want to support multiple values for a single key in default column family. It would make the supported operations consistent b/w default and other column families. But it can be added later, I am not strongly opinionated on this atm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
sunchao commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480668776 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Sure. I'm pretty much doing something very similar to this PR https://github.com/apache/spark/pull/43627 but happy to add a new constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480667215 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ## @@ -249,4 +259,109 @@ class NoPrefixKeyStateEncoder(keySchema: StructType, valueSchema: StructType) override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { throw new IllegalStateException("This encoder doesn't support prefix key!") } + + override def supportsMultipleValuesPerKey: Boolean = false + + override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = { +throw new UnsupportedOperationException("encoder does not support multiple values per key") + } +} + +/** + * Supports encoding multiple values per key in RocksDB. + * A single value is encoded in the format below, where first value is number of bytes + * in actual encodedUnsafeRow followed by the encoded value itself. + * + * |---size(bytes)--|--unsafeRowEncodedBytes--| + * + * Multiple values are separated by a delimiter character. + * + * This encoder supports RocksDB StringAppendOperator merge operator. Values encoded can be + * merged in RocksDB using merge operation, and all merged values can be read using decodeValues + * operation. + */ +class MultiValuedStateEncoder(keySchema: StructType, valueSchema: StructType) + extends RocksDBStateEncoder with Logging { + + import RocksDBStateEncoder._ + + // Reusable objects + private val keyRow = new UnsafeRow(keySchema.size) + private val valueRow = new UnsafeRow(valueSchema.size) + private val rowTuple = new UnsafeRowPair() + + override def supportPrefixKeyScan: Boolean = false + + override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { +throw new IllegalStateException("This encoder doesn't support prefix key!") + } + + override def extractPrefixKey(key: UnsafeRow): UnsafeRow = { +throw new IllegalStateException("This encoder doesn't support prefix key!") + } + + override def encodeKey(row: UnsafeRow): Array[Byte] = { +encodeUnsafeRow(row) + } + + override def encodeValue(row: UnsafeRow): Array[Byte] = { +val bytes = encodeUnsafeRow(row) +val numBytes = bytes.length + +val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length) +Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, numBytes) +Platform.copyMemory(bytes, Platform.BYTE_ARRAY_OFFSET, + encodedBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, bytes.length) + +encodedBytes + } + + override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = { +decodeToUnsafeRow(keyBytes, keyRow) + } + + override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = { +if (valueBytes == null) { + null +} else { + val numBytes = Platform.getInt(valueBytes, Platform.BYTE_ARRAY_OFFSET) + val encodedValue = new Array[Byte](numBytes) + Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, +encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes) + decodeToUnsafeRow(encodedValue, valueRow) +} + } + + override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = { +if (valueBytes == null) { + Seq().iterator +} else { + new Iterator[UnsafeRow] { +private var pos: Int = Platform.BYTE_ARRAY_OFFSET +private val maxPos = Platform.BYTE_ARRAY_OFFSET + valueBytes.length + +override def hasNext: Boolean = { + pos < maxPos +} + +override def next(): UnsafeRow = { + val numBytes = Platform.getInt(valueBytes, pos) + + pos += java.lang.Integer.BYTES + val encodedValue = new Array[Byte](numBytes) + Platform.copyMemory(valueBytes, pos, +encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes) + + pos += numBytes + pos += 1 // eat the delimiter character + decodeToUnsafeRow(encodedValue, valueRow) +} + } +} + } + override def supportsMultipleValuesPerKey: Boolean = true Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
dongjoon-hyun commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480666905 ## core/src/main/scala/org/apache/spark/SparkEnv.scala: ## @@ -67,7 +67,6 @@ class SparkEnv ( val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, -val memoryManager: MemoryManager, Review Comment: Could you avoid this breaking change by adding a new constructor, @sunchao ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r148084 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], +colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480666519 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], Review Comment: Yes, thanks for catching. Added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
dongjoon-hyun commented on code in PR #45052: URL: https://github.com/apache/spark/pull/45052#discussion_r1480666428 ## core/src/main/scala/org/apache/spark/SparkContext.scala: ## @@ -578,6 +578,8 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize any plugins before the task scheduler is initialized. _plugins = PluginContainer(this, _resources.asJava) _env.initializeShuffleManager() +_env.initializeMemoryManager(SparkContext.numDriverCores(master, conf)) + Review Comment: nit. extra empty line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480665787 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], +colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { +verifyColFamilyExists(colFamilyName) + +if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) + if (oldValue == null) { +numKeysOnWritingVersion += 1 + } +} +db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value) + +if (useColumnFamilies) { + changelogWriter.foreach(_.merge(key, value, colFamilyName)) +} else { + changelogWriter.foreach(_.merge(key, value)) Review Comment: Yea lets support it for default col family - that could be useful in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480664465 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], +colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { +verifyColFamilyExists(colFamilyName) + +if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) + if (oldValue == null) { +numKeysOnWritingVersion += 1 + } +} +db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value) + +if (useColumnFamilies) { + changelogWriter.foreach(_.merge(key, value, colFamilyName)) +} else { + changelogWriter.foreach(_.merge(key, value)) Review Comment: I agree we probably won't use it in near future, but I don't see a reason to restrict it only to column families. The changes work as expected, and can be used for `default` column family in future if need arises. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480662135 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.commons.lang3.SerializationUtils + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.streaming.state.StateStoreErrors +import org.apache.spark.sql.types.{BinaryType, StructType} + +/** + * Helper object providing APIs to encodes the grouping key, and user provided values + * to Spark [[UnsafeRow]]. + */ +object StateTypesEncoderUtils { + + // TODO: validate places that are trying to encode the key and check if we can eliminate/ + // add caching for some of these calls. + def encodeGroupingKey(stateName: String, keyExprEnc: ExpressionEncoder[Any]): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (keyOption.isEmpty) { + throw StateStoreErrors.implicitKeyNotFound(stateName) +} + +val toRow = keyExprEnc.createSerializer() +val keyByteArr = toRow + .apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() + +val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) Review Comment: Done. I think class private val is okay. We don't expect them to change across State variables. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]
sunchao opened a new pull request, #45052: URL: https://github.com/apache/spark/pull/45052 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46913][WIP] Add support for processing/event time based timers with transformWithState operator [spark]
anishshri-db opened a new pull request, #45051: URL: https://github.com/apache/spark/pull/45051 ### What changes were proposed in this pull request? Add support for processing/event time based timers with transformWithState operator ### Why are the changes needed? Changes are required to add event-driven timer based support for stateful streaming applications based on state api v2 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Added unit tests ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]
allisonwang-db commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1480622631 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala: ## @@ -45,13 +63,50 @@ class PythonScan( new PythonPartitionReaderFactory( ds.source, readerFunc, outputSchema, jobArtifactUUID) } +} - override def toBatch: Batch = this +case class PythonStreamingSourceOffset(json: String) extends Offset - override def description: String = "(Python)" +case class PythonStreamingSourcePartition(partition: Array[Byte]) extends InputPartition - override def readSchema(): StructType = outputSchema +class PythonMicroBatchStream( Review Comment: should we put this in a different file? ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val partitionsFuncId = 886 + val latestOffsetsFuncId = 887 Review Comment: what are these hardcoded numbers? ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala: ## @@ -27,9 +30,24 @@ class PythonScan( ds: PythonDataSourceV2, shortName: String, outputSchema: StructType, - options: CaseInsensitiveStringMap) extends Batch with Scan { + options: CaseInsensitiveStringMap) extends Scan { - private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + override def toBatch: Batch = new PythonBatch(ds, shortName, outputSchema, options) + + override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = +new PythonMicroBatchStream(ds, shortName, outputSchema, options) + + override def description: String = "(Python)" + + override def readSchema(): StructType = outputSchema + + override def supportedCustomMetrics(): Array[CustomMetric] = +ds.source.createPythonMetrics() +} + +class PythonBatch(ds: PythonDataSourceV2, shortName: String, + outputSchema: StructType, options: CaseInsensitiveStringMap) extends Batch { Review Comment: nit: ident ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import
Re: [PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python data source streaming write [spark]
xinrong-meng commented on PR #45049: URL: https://github.com/apache/spark/pull/45049#issuecomment-1930884879 Would you create a Spark JIRA https://issues.apache.org/jira/browse/SPARK and add it to the PR title? Please refer to https://spark.apache.org/contributing.html for details. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46989][SQL][CONNECT] Improve concurrency performance for SparkSession [spark]
ueshin commented on code in PR #45046: URL: https://github.com/apache/spark/pull/45046#discussion_r1480617495 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -928,10 +929,10 @@ object SparkSession extends Logging { * * @since 3.5.0 */ -def config(map: Map[String, Any]): Builder = synchronized { +def config(map: Map[String, Any]): Builder = { map.foreach { kv: (String, Any) => { - options += kv._1 -> kv._2.toString + options.put(kv._1, kv._2.toString) Review Comment: This change seems to be a breaking change as another thread can read/write between each `put`, whereas can't before? How about using `putAll` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]
xinrong-meng commented on PR #45042: URL: https://github.com/apache/spark/pull/45042#issuecomment-1930881092 LGTM once tests passed, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]
xinrong-meng commented on PR #45045: URL: https://github.com/apache/spark/pull/45045#issuecomment-193088 LGTM once tests passed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
ericm-db commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480608319 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: I thought it was good enough as long as it didn't initialize per partition, and once per executor was okay -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]
anishshri-db commented on code in PR #44884: URL: https://github.com/apache/spark/pull/44884#discussion_r1480600742 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -155,23 +161,112 @@ case class TransformWithStateExec( override protected def doExecute(): RDD[InternalRow] = { metrics // force lazy init at driver -child.execute().mapPartitionsWithStateStore[InternalRow]( - getStateInfo, - schemaForKeyRow, - schemaForValueRow, - numColsPrefixKey = 0, - session.sqlContext.sessionState, - Some(session.sqlContext.streams.stateStoreCoordinator), - useColumnFamilies = true -) { - case (store: StateStore, singleIterator: Iterator[InternalRow]) => -val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId, - keyEncoder) -assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED) -statefulProcessor.init(processorHandle, outputMode) - processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) -val result = processDataWithPartition(singleIterator, store, processorHandle) -result +if (isStreaming) { + child.execute().mapPartitionsWithStateStore[InternalRow]( +getStateInfo, +schemaForKeyRow, +schemaForValueRow, +numColsPrefixKey = 0, +session.sqlContext.sessionState, +Some(session.sqlContext.streams.stateStoreCoordinator), +useColumnFamilies = true + ) { +case (store: StateStore, singleIterator: Iterator[InternalRow]) => + processData(store, singleIterator) + } +} else { + // If the query is running in batch mode, we need to create a new StateStore and instantiate + // a temp directory on the executors in mapPartitionsWithIndex. + child.execute().mapPartitionsWithIndex[InternalRow]( +(i, iter) => { + val providerId = { +// lazy creation to initialize tempDirPath once +lazy val tempDirPath = Utils.createTempDir().getAbsolutePath Review Comment: I don't think this will evaluate once. But do we need it to evaluate once ? I thought we were fine passing a tmp path to each executor here, given that the store instance is not tracked/checkpointed ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas [spark]
xinrong-meng opened a new pull request, #45050: URL: https://github.com/apache/spark/pull/45050 ### What changes were proposed in this pull request? Support v2 (perf, memory) profiling in group/cogroup applyInPandas, which rely on physical plan nodes FlatMapGroupsInBatchExec and FlatMapCoGroupsInBatchExec. ### Why are the changes needed? Complete v2 profiling support. ### Does this PR introduce _any_ user-facing change? Yes. V2 profiling in group/cogroup applyInPandas is supported. ### How was this patch tested? Unit tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480592060 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider value } +override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { + verify(key != null, "Key cannot be null") + verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a encoder " + + "that supports multiple values for a single key.") + val valueIterator = encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName)) + + if (!isValidated && valueIterator.nonEmpty) { Review Comment: hmm - why do we have this check ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480590932 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider value } +override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { + verify(key != null, "Key cannot be null") + verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a encoder " + + "that supports multiple values for a single key.") + val valueIterator = encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName)) + + if (!isValidated && valueIterator.nonEmpty) { +new Iterator[UnsafeRow] { + override def hasNext: Boolean = { +valueIterator.hasNext + } + + override def next(): UnsafeRow = { +val value = valueIterator.next() +if (!isValidated && value != null) { Review Comment: We should probably skip this if `useColumnFamilies` is enabled ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480590125 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.internal.SQLConf + +case class InputRow(key: String, action: String, value: String) + +class TestListStateProcessor + extends StatefulProcessor[String, InputRow, (String, String)] { + + @transient var _processorHandle: StatefulProcessorHandle = _ + @transient var _listState: ListState[String] = _ + + override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = { +_processorHandle = handle +_listState = handle.getListState("testListState") + } + + override def handleInputRows(key: String, + rows: Iterator[InputRow], + timerValues: TimerValues): Iterator[(String, String)] = { + +var output = List[(String, String)]() + +for (row <- rows) { + if (row.action == "emit") { +output = (key, row.value) :: output + } else if (row.action == "emitAllInState") { +_listState.get().foreach(v => { + output = (key, v) :: output +}) +_listState.remove() + } else if (row.action == "append") { +_listState.appendValue(row.value) + } else if (row.action == "appendAll") { +_listState.appendList(row.value.split(",")) + } else if (row.action == "put") { +_listState.put(row.value.split(",")) + } else if (row.action == "remove") { +_listState.remove() + } else if (row.action == "tryAppendingNull") { +_listState.appendValue(null) + } else if (row.action == "tryAppendingNullValueInList") { +_listState.appendList(Array(null)) + } else if (row.action == "tryAppendingNullList") { +_listState.appendList(null) + } else if (row.action == "tryPutNullList") { +_listState.put(null) + } else if (row.action == "tryPuttingNullInList") { +_listState.put(Array(null)) + } +} + +output.iterator + } + + + override def close(): Unit = { + } +} + +class TransformWithListStateSuite extends StreamTest + with AlsoTestWithChangelogCheckpointingEnabled { + import testImplicits._ + + test("test appending null value in list state throw exception") { +withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + + val inputData = MemoryStream[InputRow] + val result = inputData.toDS() +.groupByKey(x => x.key) +.transformWithState(new TestListStateProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(result, OutputMode.Update()) ( +AddData(inputData, InputRow("k1", "tryAppendingNull", "")), +ExpectFailure[SparkException](e => { + assert(e.getMessage.contains("CANNOT_WRITE_STATE_STORE.NULL_VALUE")) +}) + ) +} + } + + test("test putting null value in list state throw exception") { +withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + + val inputData = MemoryStream[InputRow] + val result = inputData.toDS() +.groupByKey(x => x.key) +.transformWithState(new TestListStateProcessor(), + TimeoutMode.NoTimeouts(), + OutputMode.Update()) + + testStream(result, OutputMode.Update())( +AddData(inputData, InputRow("k1", "tryPuttingNullInList", "")), +ExpectFailure[SparkException](e => { + assert(e.getMessage.contains("CANNOT_WRITE_STATE_STORE.NULL_VALUE")) +}) + ) +} + } + + test("test putting null list in list state throw exception") { +withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> + classOf[RocksDBStateStoreProvider].getName) { + + val inputData =
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480589830 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.internal.SQLConf + +case class InputRow(key: String, action: String, value: String) + +class TestListStateProcessor + extends StatefulProcessor[String, InputRow, (String, String)] { + + @transient var _processorHandle: StatefulProcessorHandle = _ + @transient var _listState: ListState[String] = _ + + override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = { +_processorHandle = handle +_listState = handle.getListState("testListState") + } + + override def handleInputRows(key: String, + rows: Iterator[InputRow], + timerValues: TimerValues): Iterator[(String, String)] = { + +var output = List[(String, String)]() + +for (row <- rows) { + if (row.action == "emit") { +output = (key, row.value) :: output + } else if (row.action == "emitAllInState") { +_listState.get().foreach(v => { + output = (key, v) :: output +}) +_listState.remove() + } else if (row.action == "append") { +_listState.appendValue(row.value) + } else if (row.action == "appendAll") { +_listState.appendList(row.value.split(",")) + } else if (row.action == "put") { +_listState.put(row.value.split(",")) + } else if (row.action == "remove") { +_listState.remove() + } else if (row.action == "tryAppendingNull") { +_listState.appendValue(null) + } else if (row.action == "tryAppendingNullValueInList") { +_listState.appendList(Array(null)) + } else if (row.action == "tryAppendingNullList") { +_listState.appendList(null) + } else if (row.action == "tryPutNullList") { +_listState.put(null) + } else if (row.action == "tryPuttingNullInList") { +_listState.put(Array(null)) + } +} + +output.iterator + } + + + override def close(): Unit = { + } +} + +class TransformWithListStateSuite extends StreamTest + with AlsoTestWithChangelogCheckpointingEnabled { + import testImplicits._ + + test("test appending null value in list state throw exception") { Review Comment: Should we move these tests to a separate suite such as `TransformWithListStateValidationSuite` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480588124 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider} +import org.apache.spark.sql.internal.SQLConf + +case class InputRow(key: String, action: String, value: String) + +class TestListStateProcessor + extends StatefulProcessor[String, InputRow, (String, String)] { + + @transient var _processorHandle: StatefulProcessorHandle = _ + @transient var _listState: ListState[String] = _ + + override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = { +_processorHandle = handle +_listState = handle.getListState("testListState") + } + + override def handleInputRows(key: String, + rows: Iterator[InputRow], + timerValues: TimerValues): Iterator[(String, String)] = { + +var output = List[(String, String)]() + +for (row <- rows) { + if (row.action == "emit") { +output = (key, row.value) :: output + } else if (row.action == "emitAllInState") { +_listState.get().foreach(v => { + output = (key, v) :: output +}) +_listState.remove() + } else if (row.action == "append") { +_listState.appendValue(row.value) + } else if (row.action == "appendAll") { +_listState.appendList(row.value.split(",")) + } else if (row.action == "put") { +_listState.put(row.value.split(",")) + } else if (row.action == "remove") { +_listState.remove() + } else if (row.action == "tryAppendingNull") { +_listState.appendValue(null) + } else if (row.action == "tryAppendingNullValueInList") { +_listState.appendList(Array(null)) + } else if (row.action == "tryAppendingNullList") { +_listState.appendList(null) + } else if (row.action == "tryPutNullList") { +_listState.put(null) + } else if (row.action == "tryPuttingNullInList") { +_listState.put(Array(null)) + } +} + +output.iterator + } + + Review Comment: Nit: extra newline ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480587660 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -845,6 +859,96 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + testWithChangelogCheckpointingEnabled("ensure merge operation is not supported" + +" with changelog checkpoint if column families is not enabled") { +withTempDir { dir => + val remoteDir = Utils.createTempDir().toString + val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false) + new File(remoteDir).delete() // to make sure that the directory gets created + withDB(remoteDir, conf = conf, useColumnFamilies = false) { db => +db.load(0) +db.put("a", "1") +intercept[UnsupportedOperationException]( + db.merge("a", "2") +) + } +} + } + + testWithChangelogCheckpointingDisabled("ensure merge operation is supported" + +" without changelog checkpoint if column families is not enabled") { Review Comment: Hmm - why do we have these tests ? Which case are we trying to cover here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480586073 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala: ## @@ -57,7 +57,10 @@ package object state { sessionState: SessionState, storeCoordinator: Option[StateStoreCoordinatorRef], useColumnFamilies: Boolean = false, -extraOptions: Map[String, String] = Map.empty)( +extraOptions: Map[String, String] = Map.empty, +// TODO: refactor using the boolean parameter for choosing stateful encoder properties Review Comment: Should we remove this comment ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480578009 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala: ## @@ -222,6 +232,31 @@ class StateStoreChangelogWriterV2( size += 1 } + override def merge(key: Array[Byte], value: Array[Byte]): Unit = { Review Comment: This interface is not supported in both v1 and v2 writers. Should we just remove this then ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480576634 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -215,7 +253,13 @@ private[sql] class RocksDBStateStoreProvider (keySchema.length > numColsPrefixKey), "The number of columns in the key must be " + "greater than the number of columns for prefix key!") -this.encoder = RocksDBStateEncoder.getEncoder(keySchema, valueSchema, numColsPrefixKey) +if (useMultipleValuesPerKey) { Review Comment: After my refactoring change - we can get rid of this check I guess -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480575858 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ## @@ -249,4 +259,109 @@ class NoPrefixKeyStateEncoder(keySchema: StructType, valueSchema: StructType) override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { throw new IllegalStateException("This encoder doesn't support prefix key!") } + + override def supportsMultipleValuesPerKey: Boolean = false + + override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = { +throw new UnsupportedOperationException("encoder does not support multiple values per key") + } +} + +/** + * Supports encoding multiple values per key in RocksDB. + * A single value is encoded in the format below, where first value is number of bytes + * in actual encodedUnsafeRow followed by the encoded value itself. + * + * |---size(bytes)--|--unsafeRowEncodedBytes--| + * + * Multiple values are separated by a delimiter character. + * + * This encoder supports RocksDB StringAppendOperator merge operator. Values encoded can be + * merged in RocksDB using merge operation, and all merged values can be read using decodeValues + * operation. + */ +class MultiValuedStateEncoder(keySchema: StructType, valueSchema: StructType) + extends RocksDBStateEncoder with Logging { + + import RocksDBStateEncoder._ + + // Reusable objects + private val keyRow = new UnsafeRow(keySchema.size) + private val valueRow = new UnsafeRow(valueSchema.size) + private val rowTuple = new UnsafeRowPair() + + override def supportPrefixKeyScan: Boolean = false + + override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = { +throw new IllegalStateException("This encoder doesn't support prefix key!") + } + + override def extractPrefixKey(key: UnsafeRow): UnsafeRow = { +throw new IllegalStateException("This encoder doesn't support prefix key!") + } + + override def encodeKey(row: UnsafeRow): Array[Byte] = { +encodeUnsafeRow(row) + } + + override def encodeValue(row: UnsafeRow): Array[Byte] = { +val bytes = encodeUnsafeRow(row) +val numBytes = bytes.length + +val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length) +Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, numBytes) +Platform.copyMemory(bytes, Platform.BYTE_ARRAY_OFFSET, + encodedBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, bytes.length) + +encodedBytes + } + + override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = { +decodeToUnsafeRow(keyBytes, keyRow) + } + + override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = { +if (valueBytes == null) { + null +} else { + val numBytes = Platform.getInt(valueBytes, Platform.BYTE_ARRAY_OFFSET) + val encodedValue = new Array[Byte](numBytes) + Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, +encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes) + decodeToUnsafeRow(encodedValue, valueRow) +} + } + + override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = { +if (valueBytes == null) { + Seq().iterator +} else { + new Iterator[UnsafeRow] { +private var pos: Int = Platform.BYTE_ARRAY_OFFSET +private val maxPos = Platform.BYTE_ARRAY_OFFSET + valueBytes.length + +override def hasNext: Boolean = { + pos < maxPos +} + +override def next(): UnsafeRow = { + val numBytes = Platform.getInt(valueBytes, pos) + + pos += java.lang.Integer.BYTES + val encodedValue = new Array[Byte](numBytes) + Platform.copyMemory(valueBytes, pos, +encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes) + + pos += numBytes + pos += 1 // eat the delimiter character + decodeToUnsafeRow(encodedValue, valueRow) +} + } +} + } + override def supportsMultipleValuesPerKey: Boolean = true Review Comment: Nit: can we add a newline here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480574813 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], +colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { Review Comment: Also maybe change the indent for all args to be 4 spaces and on new lines similar to functions above ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480574441 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], Review Comment: Could we add a function level comment here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480574178 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -316,6 +321,25 @@ class RocksDB( } } + def merge(key: Array[Byte], value: Array[Byte], +colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = { +verifyColFamilyExists(colFamilyName) + +if (conf.trackTotalNumberOfRows) { + val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key) + if (oldValue == null) { +numKeysOnWritingVersion += 1 + } +} +db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value) + +if (useColumnFamilies) { + changelogWriter.foreach(_.merge(key, value, colFamilyName)) +} else { + changelogWriter.foreach(_.merge(key, value)) Review Comment: We want to support this for the non-col-families case too ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480573652 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala: ## @@ -117,6 +118,7 @@ class RocksDB( dbOptions.setTableFormatConfig(tableFormatConfig) dbOptions.setMaxOpenFiles(conf.maxOpenFiles) dbOptions.setAllowFAllocate(conf.allowFAllocate) + dbOptions.setMergeOperator(new StringAppendOperator()) Review Comment: Oh nice - I already made this change then :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
anishshri-db commented on code in PR #45038: URL: https://github.com/apache/spark/pull/45038#discussion_r1480570588 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -48,54 +50,86 @@ private[sql] class RocksDBStateStoreProvider override def version: Long = lastVersion -override def createColFamilyIfAbsent(colFamilyName: String): Unit = { +override def createColFamilyIfAbsent( +colFamilyName: String, +keySchema: StructType, +numColsPrefixKey: Int, +valueSchema: StructType): Unit = { verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME, s"Failed to create column family with reserved_name=$colFamilyName") + verify(useColumnFamilies, "Column families are not supported in this store") rocksDB.createColFamilyIfAbsent(colFamilyName) + encoderMapLock.synchronized { +keyEncoderMap.getOrElseUpdate(colFamilyName, + RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey)) + +valueEncoderMap.getOrElseUpdate(colFamilyName, + RocksDBStateEncoder.getValueEncoder(valueSchema)) + } Review Comment: If the entry exists, then we won't replace it actually with the current changes. But on restart, this can happen. However, for the col family case - we need to decide interaction with state schema compatibility checker anyway. So will prefer to do this in a separate PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
anishshri-db commented on code in PR #45038: URL: https://github.com/apache/spark/pull/45038#discussion_r1480567343 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider useColumnFamilies) } - @volatile private var encoder: RocksDBStateEncoder = _ + private val encoderMapLock = new Object + + @GuardedBy("encoderMapLock") + @volatile private var keyEncoderMap = new mutable.HashMap[String, RocksDBKeyStateEncoder] + + @GuardedBy("encoderMapLock") + @volatile private var valueEncoderMap = new mutable.HashMap[String, RocksDBValueStateEncoder] Review Comment: Done ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider useColumnFamilies) } - @volatile private var encoder: RocksDBStateEncoder = _ + private val encoderMapLock = new Object Review Comment: Removed this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480564218 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import org.apache.commons.lang3.SerializationUtils + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.execution.streaming.state.StateStoreErrors +import org.apache.spark.sql.types.{BinaryType, StructType} + +/** + * Helper object providing APIs to encodes the grouping key, and user provided values + * to Spark [[UnsafeRow]]. + */ +object StateTypesEncoderUtils { + + // TODO: validate places that are trying to encode the key and check if we can eliminate/ + // add caching for some of these calls. + def encodeGroupingKey(stateName: String, keyExprEnc: ExpressionEncoder[Any]): UnsafeRow = { +val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption +if (keyOption.isEmpty) { + throw StateStoreErrors.implicitKeyNotFound(stateName) +} + +val toRow = keyExprEnc.createSerializer() +val keyByteArr = toRow + .apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes() + +val schemaForKeyRow: StructType = new StructType().add("key", BinaryType) Review Comment: Maybe just store as class/singleton private members once ? i wonder whether we should just accept this as a function argument ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480561627 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.{Evolving, Experimental} + +@Experimental +@Evolving +/** + * Interface used for arbitrary stateful operations with the v2 API to capture + * list value state. + */ +private[sql] trait ListState[S] extends Serializable { + + /** Whether state exists or not. */ + def exists(): Boolean + + /** Get the state value if it exists */ + def get(): Iterator[S] + + /** Get the list value as an option if it exists and None otherwise */ + def getOption(): Option[Iterator[S]] + + /** Update the value of the list. */ + def put(newState: Array[S]): Unit + + /** Append an entry to the list */ + def appendValue(newState: S): Unit + + /** Append an entire list to the existing value */ + def appendList(newState: Array[S]): Unit + + /** Remove this state. */ + def remove(): Unit Review Comment: Yes please go ahead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480557033 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.{Evolving, Experimental} + +@Experimental +@Evolving +/** + * Interface used for arbitrary stateful operations with the v2 API to capture + * list value state. + */ +private[sql] trait ListState[S] extends Serializable { + + /** Whether state exists or not. */ + def exists(): Boolean + + /** Get the state value if it exists */ + def get(): Iterator[S] + + /** Get the list value as an option if it exists and None otherwise */ + def getOption(): Option[Iterator[S]] + + /** Update the value of the list. */ + def put(newState: Array[S]): Unit + + /** Append an entry to the list */ + def appendValue(newState: S): Unit + + /** Append an entire list to the existing value */ + def appendList(newState: Array[S]): Unit + + /** Remove this state. */ + def remove(): Unit Review Comment: Sounds good. I think we should rename ValueState accordingly then. Do you see any concerns with naming `remove` to `clear` in both? If no, I will go ahead and make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on PR #44961: URL: https://github.com/apache/spark/pull/44961#issuecomment-1930775538 cc: @HeartSaVioR PTA, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]
agubichev commented on PR #44514: URL: https://github.com/apache/spark/pull/44514#issuecomment-1930746115 @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python data source streaming write [spark]
chaoqin-li1123 commented on PR #45049: URL: https://github.com/apache/spark/pull/45049#issuecomment-1930626019 @HyukjinKwon @HeartSaVioR @allisonwang-db PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python source streaming write [spark]
chaoqin-li1123 opened a new pull request, #45049: URL: https://github.com/apache/spark/pull/45049 ### What changes were proposed in this pull request? Move PythonBatchWrite out of PythonWrite. ### Why are the changes needed? This is to prepare for supporting python data source streaming write in the future. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Trivial code refactoring, existing test sufficient. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]
xinrong-meng commented on PR #45035: URL: https://github.com/apache/spark/pull/45035#issuecomment-1930607059 Test failure ``` FAIL [0.468s]: test_shuffle_data_with_multiple_locations (pyspark.tests.test_shuffle.MergerTests) -- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/tests/test_shuffle.py", line 84, in test_shuffle_data_with_multiple_locations self.assertTrue( AssertionError: False is not true ``` is irrelevant to the PR. Let me retrigger the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
sahnib commented on code in PR #45038: URL: https://github.com/apache/spark/pull/45038#discussion_r1480430719 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -48,54 +50,86 @@ private[sql] class RocksDBStateStoreProvider override def version: Long = lastVersion -override def createColFamilyIfAbsent(colFamilyName: String): Unit = { +override def createColFamilyIfAbsent( +colFamilyName: String, +keySchema: StructType, +numColsPrefixKey: Int, +valueSchema: StructType): Unit = { verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME, s"Failed to create column family with reserved_name=$colFamilyName") + verify(useColumnFamilies, "Column families are not supported in this store") rocksDB.createColFamilyIfAbsent(colFamilyName) + encoderMapLock.synchronized { +keyEncoderMap.getOrElseUpdate(colFamilyName, + RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey)) + +valueEncoderMap.getOrElseUpdate(colFamilyName, + RocksDBStateEncoder.getValueEncoder(valueSchema)) + } Review Comment: Should we throw an exception if the passed key/value schema are now different for a existing column family (which was previously created). Consider the scenario below: 1. User creates a colFamily with keySchema K, valueSchema V. 2. User issues the call again with keySchema K1, valueSchema V1. Note K1 != K, or V1 != V. 3. Now the call in (2) succeeds, but encoders are using a different schema. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
anishshri-db commented on code in PR #45038: URL: https://github.com/apache/spark/pull/45038#discussion_r1480427117 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider useColumnFamilies) } - @volatile private var encoder: RocksDBStateEncoder = _ + private val encoderMapLock = new Object Review Comment: Not sure I understand - we would still have to protect the same hashMap right ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
sahnib commented on code in PR #45038: URL: https://github.com/apache/spark/pull/45038#discussion_r1480416349 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider useColumnFamilies) } - @volatile private var encoder: RocksDBStateEncoder = _ + private val encoderMapLock = new Object Review Comment: [nit] rename this to `columnFamilyEncoderMapLock` as it only applies to column families. Encoders for default column family are picked at State store init. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider useColumnFamilies) } - @volatile private var encoder: RocksDBStateEncoder = _ + private val encoderMapLock = new Object + + @GuardedBy("encoderMapLock") + @volatile private var keyEncoderMap = new mutable.HashMap[String, RocksDBKeyStateEncoder] + + @GuardedBy("encoderMapLock") + @volatile private var valueEncoderMap = new mutable.HashMap[String, RocksDBValueStateEncoder] Review Comment: Can we just use a concurrentHashMap with signature `Map[String, (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]`. Then, we don't need to synchronize using a lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1480419508 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.{Evolving, Experimental} + +@Experimental +@Evolving +/** + * Interface used for arbitrary stateful operations with the v2 API to capture + * list value state. + */ +private[sql] trait ListState[S] extends Serializable { + + /** Whether state exists or not. */ + def exists(): Boolean + + /** Get the state value if it exists */ + def get(): Iterator[S] + + /** Get the list value as an option if it exists and None otherwise */ + def getOption(): Option[Iterator[S]] + + /** Update the value of the list. */ + def put(newState: Array[S]): Unit + + /** Append an entry to the list */ + def appendValue(newState: S): Unit + + /** Append an entire list to the existing value */ + def appendList(newState: Array[S]): Unit + + /** Remove this state. */ + def remove(): Unit Review Comment: Should we rename this to `clear` ? I guess `remove` would be removing an element from the set of list values ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
hvanhovell commented on code in PR #43473: URL: https://github.com/apache/spark/pull/43473#discussion_r1480406839 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -115,6 +115,49 @@ class SparkConnectPlanner( private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) + // Some relation transform need to create Dataset, then get the logical plan from the Dataset. + // This method used to reuse the Dataset instead to discard it. + def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = { Review Comment: Is this something you want to work on? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
hvanhovell commented on code in PR #43473: URL: https://github.com/apache/spark/pull/43473#discussion_r1480405679 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala: ## @@ -115,6 +115,49 @@ class SparkConnectPlanner( private lazy val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) + // Some relation transform need to create Dataset, then get the logical plan from the Dataset. + // This method used to reuse the Dataset instead to discard it. + def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = { Review Comment: TBH I want to move away from constructing Datasets wholesale. In many cases there is no real need, and it is also expensive to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-28346][SQL] clone the query plan between analyzer, optimizer and planner [spark]
dongjoon-hyun commented on PR #25111: URL: https://github.com/apache/spark/pull/25111#issuecomment-1930461904 To @MasterDDT , I'd like to recommend to file an official JIRA issue. Otherwise, it's difficult to get any further discussion or help because this is too old thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org