Re: [PR] [SPARK-45762][CORE] Support shuffle managers defined in user jars by changing startup order [spark]

2024-02-06 Thread via GitHub


mridulm commented on PR #43627:
URL: https://github.com/apache/spark/pull/43627#issuecomment-1931474172

   @dongjoon-hyun, tt is `@DeveloperApi` from point of view of usage - 
`SparkEnv` is not expected to be created by users, as some of the constructor 
parameters are not externally visible (`RpcEnv`, for example, cannot be created 
as it is `private[spark]`). There have been changes to its constructor in the 
past as well, after it was marked `@DeveloperApi` - though to be fair, these 
were a while back.
   
   In general, I am conflicted about trying to preserve compatibility for 
things which are clearly private to spark - it inhibits the ability for the 
project to evolve: especially around major version boundaries (though we do 
have a lot of these instances where we try to maintain compatibility).
   
   Given how long `SparkEnv` has been around, I can see valid case being made 
for adding a constructor which preserves earlier signature. Thiughts @tgravescs 
?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480989833


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+val broadcastedHadoopConf =

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-43117][CONNECT] Make `ProtoUtils.abbreviate` support repeated fields [spark]

2024-02-06 Thread via GitHub


zhengruifeng opened a new pull request, #45056:
URL: https://github.com/apache/spark/pull/45056

   ### What changes were proposed in this pull request?
   Make `ProtoUtils.abbreviate` support repeated fields
   
   
   ### Why are the changes needed?
   existing implementation does not work for repeated fields (strings/messages)
   
   we don't have `repeated bytes` in Spark Connect for now, so let it alone
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   added UTs
   
   ### Was this patch authored or co-authored using generative AI tooling?
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-06 Thread via GitHub


cloud-fan commented on PR #45054:
URL: https://github.com/apache/spark/pull/45054#issuecomment-1931369315

   cc @yaooqinn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46997][CORE] Enable `spark.worker.cleanup.enabled` by default [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun opened a new pull request, #45055:
URL: https://github.com/apache/spark/pull/45055

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480916745


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+val broadcastedHadoopConf =
+  new SerializableConfiguration(session.sessionState.newHadoopConf())
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once

Review Comment:
   nit: remove the comment to be in sync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480916290


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+val broadcastedHadoopConf =

Review Comment:
   This is not needed for streaming - let's do this just before L184.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1931304594

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480908781


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-06 Thread via GitHub


cloud-fan commented on code in PR #45036:
URL: https://github.com/apache/spark/pull/45036#discussion_r1480904118


##
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala:
##
@@ -269,4 +269,35 @@ class OpenHashSetSuite extends SparkFunSuite with Matchers 
{
   assert(pos1 == pos2)
 }
   }
+
+  test("SPARK-45599: 0.0 and -0.0 are equal but not the same") {

Review Comment:
   This is a bit tricky and it's better if we can find a reference system that 
defines this semantic. In Spark, `0.0 == -0.0`, and in GROUP BY, 0.0 and -0.0 
are considered to be in the same group and normalized to 0.0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-06 Thread via GitHub


cloud-fan commented on code in PR #45054:
URL: https://github.com/apache/spark/pull/45054#discussion_r1480902456


##
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala:
##
@@ -1630,20 +1630,37 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
   SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
   SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
 
-  var finalPlan = ""
+  var finalPlan: SparkPlanInfo = null
   val listener = new SparkListener {
 override def onOtherEvent(event: SparkListenerEvent): Unit = {
   event match {
-case SparkListenerSQLAdaptiveExecutionUpdate(_, physicalPlanDesc, 
sparkPlanInfo) =>
+case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) 
=>
   if (sparkPlanInfo.simpleString.startsWith(
   "AdaptiveSparkPlan isFinalPlan=true")) {
-finalPlan = physicalPlanDesc
+finalPlan = sparkPlanInfo
   }
 case _ => // ignore other events
   }
 }
   }
 
+  def findNodeInSparkPlanInfo(root: SparkPlanInfo, cond: SparkPlanInfo => 
Boolean):
+  Option[SparkPlanInfo] = {
+if (cond(root)) {
+  Some(root)
+} else {
+  root.children.flatMap(findNodeInSparkPlanInfo(_, cond)).headOption
+}
+  }
+
+  def cachedFinalStageCoalesced(sparkPlanInfo: SparkPlanInfo): Boolean = {
+val inMemoryScanNode = findNodeInSparkPlanInfo(sparkPlanInfo,
+  _.nodeName.contains("TableCacheQueryStage"))
+val resultNode = findNodeInSparkPlanInfo(inMemoryScanNode.get,
+  _.nodeName.contains("ResultQueryStage"))

Review Comment:
   there is no `ResultQueryStage` in Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1480902173


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,97 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):

Review Comment:
   Small hint: majority of methods we are introducing in Python streaming data 
source have corresponding methods in Scala API. You are OK to copy the same 
content, with modification to reflect the diff on contract if any.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-06 Thread via GitHub


cloud-fan commented on code in PR #45054:
URL: https://github.com/apache/spark/pull/45054#discussion_r1480901308


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -1561,11 +1561,11 @@ object SQLConf {
   .doc("Whether to forcibly enable some optimization rules that can change 
the output " +
 "partitioning of a cached query when executing it for caching. If it 
is set to true, " +
 "queries may need an extra shuffle to read the cached data. This 
configuration is " +
-"enabled by default. The optimization rules enabled by this 
configuration " +
-s"are ${ADAPTIVE_EXECUTION_ENABLED.key} and 
${AUTO_BUCKETED_SCAN_ENABLED.key}.")
+"enabled by default. The optimization rule enabled by this 
configuration " +

Review Comment:
   ```suggestion
   "disabled by default. The optimization rule enabled by this 
configuration " +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-06 Thread via GitHub


liuzqt commented on PR #45054:
URL: https://github.com/apache/spark/pull/45054#issuecomment-1931275542

   @cloud-fan @maryannxue Please help review this change, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46996][SQL] Allow AQE coalesce final stage in SQL cached plan [spark]

2024-02-06 Thread via GitHub


liuzqt opened a new pull request, #45054:
URL: https://github.com/apache/spark/pull/45054

   
   ### What changes were proposed in this pull request?
   
   https://github.com/apache/spark/pull/43435 and 
https://github.com/apache/spark/pull/43760 are fixing a correctness issue which 
will be triggered when AQE applied on cached query plan, specifically, when AQE 
coalescing the final result stage of the cached plan.
   

   
   The current semantic of 
`spark.sql.optimizer.canChangeCachedPlanOutputPartitioning`
   
   ([source 
code](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L403-L411)):
   
   when true, we enable AQE, but disable coalescing final stage (default)
   when false, we disable AQE

   
   But let’s revisit the semantic of this config: actually for caller the only 
thing that matters is whether we change the output partitioning of the cached 
plan. And we should only try to apply AQE if possible.  Thus we want to modify 
the semantic of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning
   
   when true, we enable AQE and allow coalescing final: this might lead to perf 
regression, because it introduce extra shuffle
   when false, we enable AQE, but disable coalescing final stage. (this is 
actually the `true` semantic of old behavior)
   Also, to keep the default behavior unchanged, we might want to flip the 
default value of spark.sql.optimizer.canChangeCachedPlanOutputPartitioning to 
`false`
   
   ### Why are the changes needed?
   
   To allow AQE coalesce final stage in SQL cached plan. Also make the semantic 
of `spark.sql.optimizer.canChangeCachedPlanOutputPartitioning` more reasonable.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   Updated UTs.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45599][CORE] Use object equality in OpenHashSet [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on PR #45036:
URL: https://github.com/apache/spark/pull/45036#issuecomment-1931270883

   I have no expertise on this area so will leave the decision on merging the 
change to which version(s), and whether we want to safeguard or not, to 
introduce an escape-hatch on behavioral change. I can hold cutting the tag of 
RC1 for this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1480879427


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {

Review Comment:
   Can we please document the protocol for this runner <-> driver? This is a 
new worker specialized to Python streaming source - let's not force code 
readers to figure out by themselves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]

2024-02-06 Thread via GitHub


cloud-fan closed pull request #44514: [SPARK-46526][SQL] Support LIMIT over 
correlated subqueries where predicates only reference outer table
URL: https://github.com/apache/spark/pull/44514


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]

2024-02-06 Thread via GitHub


cloud-fan commented on PR #44514:
URL: https://github.com/apache/spark/pull/44514#issuecomment-1931233168

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46975][PS] Move `to_{hdf, feather, stata}` to the fallback list [spark]

2024-02-06 Thread via GitHub


HyukjinKwon commented on code in PR #45026:
URL: https://github.com/apache/spark/pull/45026#discussion_r1480858948


##
python/pyspark/pandas/frame.py:
##
@@ -2648,123 +2648,6 @@ def to_latex(
 psdf._to_internal_pandas(), self.to_latex, pd.DataFrame.to_latex, 
args
 )
 
-def to_feather(
-self,
-path: Union[str, IO[str]],
-**kwargs: Any,
-) -> None:
-"""
-Write a DataFrame to the binary Feather format.
-
-.. note:: This method should only be used if the resulting DataFrame 
is expected
-  to be small, as all the data is loaded into the driver's 
memory.
-
-.. versionadded:: 4.0.0
-
-Parameters
---
-path : str, path object, file-like object
-String, path object (implementing ``os.PathLike[str]``), or 
file-like
-object implementing a binary ``write()`` function.
-**kwargs :
-Additional keywords passed to 
:func:`pyarrow.feather.write_feather`.
-This includes the `compression`, `compression_level`, `chunksize`
-and `version` keywords.
-
-Examples
-
->>> df = ps.DataFrame([[1, 2, 3], [4, 5, 6]])
->>> df.to_feather("file.feather")  # doctest: +SKIP
-"""
-# Make sure locals() call is at the top of the function so we don't 
capture local variables.
-args = locals()
-
-return validate_arguments_and_invoke_function(
-self._to_internal_pandas(), self.to_feather, 
pd.DataFrame.to_feather, args
-)
-
-def to_stata(
-self,
-path: Union[str, IO[str]],
-*,
-convert_dates: Optional[Dict] = None,
-write_index: bool = True,
-byteorder: Optional[str] = None,
-time_stamp: Optional[datetime.datetime] = None,
-data_label: Optional[str] = None,
-variable_labels: Optional[Dict] = None,
-version: Optional[int] = 114,
-convert_strl: Optional[Sequence[Name]] = None,
-compression: str = "infer",
-storage_options: Optional[str] = None,
-value_labels: Optional[Dict] = None,
-) -> None:
-"""
-Export DataFrame object to Stata dta format.
-
-.. note:: This method should only be used if the resulting DataFrame 
is expected
-  to be small, as all the data is loaded into the driver's 
memory.
-
-.. versionadded:: 4.0.0
-
-Parameters
---
-path : str, path object, or buffer
-String, path object (implementing ``os.PathLike[str]``), or 
file-like
-object implementing a binary ``write()`` function.
-convert_dates : dict
-Dictionary mapping columns containing datetime types to stata
-internal format to use when writing the dates. Options are 'tc',
-'td', 'tm', 'tw', 'th', 'tq', 'ty'. Column can be either an integer
-or a name. Datetime columns that do not have a conversion type
-specified will be converted to 'tc'. Raises NotImplementedError if
-a datetime column has timezone information.
-write_index : bool
-Write the index to Stata dataset.
-byteorder : str
-Can be ">", "<", "little", or "big". default is `sys.byteorder`.
-time_stamp : datetime
-A datetime to use as file creation date.  Default is the current
-time.
-data_label : str, optional
-A label for the data set.  Must be 80 characters or smaller.
-variable_labels : dict
-Dictionary containing columns as keys and variable labels as
-values. Each label must be 80 characters or smaller.
-version : {{114, 117, 118, 119, None}}, default 114
-Version to use in the output dta file. Set to None to let pandas
-decide between 118 or 119 formats depending on the number of
-columns in the frame. Version 114 can be read by Stata 10 and
-later. Version 117 can be read by Stata 13 or later. Version 118
-is supported in Stata 14 and later. Version 119 is supported in
-Stata 15 and later. Version 114 limits string variables to 244
-characters or fewer while versions 117 and later allow strings
-with lengths up to 2,000,000 characters. Versions 118 and 119
-support Unicode characters, and version 119 supports more than
-32,767 variables.
-convert_strl : list, optional
-List of column names to convert to string columns to Stata StrL
-format. Only available if version is 117.  Storing strings in the
-StrL format can produce smaller dta files if strings have more than
-8 characters and values are repeated.
-value_labels : dict of dicts
-Dictionary containing columns as keys and 

Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once 
per executor **after serde**?". If then this is a good optimization, otherwise 
lazy val does nothing and it just confuses people.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once 
per executor **_after serde_**?". If then this is a good optimization, 
otherwise lazy val does nothing and it just confuses people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480850291


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Ah ok - gotcha, thx ! Yea agreed - if its not evaluating once, might as well 
remove the `lazy` portion. Maybe we should just remove it anyway - will become 
easier to read I feel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once 
per executor?". If then this is a good optimization, otherwise lazy val does 
nothing and it just confuses people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

2024-02-06 Thread via GitHub


MaxNevermind commented on PR #44636:
URL: https://github.com/apache/spark/pull/44636#issuecomment-1931178064

   Pushed another commit. One issue was resolved.
   @viirya 
   please check out a solution for the last remaining proposed issue above
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

2024-02-06 Thread via GitHub


MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##
@@ -166,6 +197,25 @@ class FileStreamSource(
 // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
 (newFiles.take(files.maxFiles()), null)
 
+  case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+// we can cache and reuse remaining fetched list of files in further 
batches
+val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+  takeFilesUntilMax(newFiles, files.maxBytes())
+if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly by compiler



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##
@@ -166,6 +197,25 @@ class FileStreamSource(
 // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
 (newFiles.take(files.maxFiles()), null)
 
+  case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+// we can cache and reuse remaining fetched list of files in further 
batches
+val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+  takeFilesUntilMax(newFiles, files.maxBytes())
+if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly by a compiler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46641][SS] Add maxBytesPerTrigger threshold [spark]

2024-02-06 Thread via GitHub


MaxNevermind commented on code in PR #44636:
URL: https://github.com/apache/spark/pull/44636#discussion_r1480827987


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala:
##
@@ -166,6 +197,25 @@ class FileStreamSource(
 // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
 (newFiles.take(files.maxFiles()), null)
 
+  case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+// we can cache and reuse remaining fetched list of files in further 
batches
+val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
+  takeFilesUntilMax(newFiles, files.maxBytes())
+if (rSize < (files.maxBytes() * DISCARD_UNSEEN_FILES_RATIO).toLong) {

Review Comment:
   @viirya 
   I've converted BigInt to Double and now it is a comparison of two Doubles
   let me know if that is acceptable, we could also do:
   ```
   if (rSize < BigDecimal(files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
   ```
   BigInt on the left would be converted to BigDecimal implicitly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-06 Thread via GitHub


wbo4958 commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480821808


##
python/pyspark/sql/pandas/map_ops.py:
##
@@ -15,9 +15,14 @@
 # limitations under the License.
 #
 import sys
-from typing import Union, TYPE_CHECKING
+from typing import Union, TYPE_CHECKING, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pyspark.resource.requests import ExecutorResourceRequests, 
TaskResourceRequests
 

Review Comment:
   Thx



##
python/pyspark/sql/pandas/map_ops.py:
##
@@ -175,6 +196,11 @@ def mapInArrow(
 
 .. versionadded: 3.5.0
 
+profile : :class:`pyspark.resource.ResourceProfile`. The optional 
ResourceProfile
+to be used for mapInPandas.

Review Comment:
   My bad, Thx very much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]

2024-02-06 Thread via GitHub


LuciferYang commented on PR #45045:
URL: https://github.com/apache/spark/pull/45045#issuecomment-1931141290

   All test passed. Merged into master for Spark 4.0. Thanks @zhengruifeng 
@dongjoon-hyun @xinrong-meng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]

2024-02-06 Thread via GitHub


LuciferYang closed pull request #45045: [SPARK-46987][CONNECT] 
`ProtoUtils.abbreviate` avoid unnecessary `setField` operation
URL: https://github.com/apache/spark/pull/45045


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46975][PS] Move `to_{hdf, feather, stata}` to the fallback list [spark]

2024-02-06 Thread via GitHub


zhengruifeng commented on code in PR #45026:
URL: https://github.com/apache/spark/pull/45026#discussion_r1480807590


##
python/pyspark/pandas/frame.py:
##
@@ -2648,123 +2648,6 @@ def to_latex(
 psdf._to_internal_pandas(), self.to_latex, pd.DataFrame.to_latex, 
args
 )
 
-def to_feather(
-self,
-path: Union[str, IO[str]],
-**kwargs: Any,
-) -> None:
-"""
-Write a DataFrame to the binary Feather format.
-
-.. note:: This method should only be used if the resulting DataFrame 
is expected
-  to be small, as all the data is loaded into the driver's 
memory.
-
-.. versionadded:: 4.0.0
-
-Parameters
---
-path : str, path object, file-like object
-String, path object (implementing ``os.PathLike[str]``), or 
file-like
-object implementing a binary ``write()`` function.
-**kwargs :
-Additional keywords passed to 
:func:`pyarrow.feather.write_feather`.
-This includes the `compression`, `compression_level`, `chunksize`
-and `version` keywords.
-
-Examples
-
->>> df = ps.DataFrame([[1, 2, 3], [4, 5, 6]])
->>> df.to_feather("file.feather")  # doctest: +SKIP
-"""
-# Make sure locals() call is at the top of the function so we don't 
capture local variables.
-args = locals()
-
-return validate_arguments_and_invoke_function(
-self._to_internal_pandas(), self.to_feather, 
pd.DataFrame.to_feather, args
-)
-
-def to_stata(
-self,
-path: Union[str, IO[str]],
-*,
-convert_dates: Optional[Dict] = None,
-write_index: bool = True,
-byteorder: Optional[str] = None,
-time_stamp: Optional[datetime.datetime] = None,
-data_label: Optional[str] = None,
-variable_labels: Optional[Dict] = None,
-version: Optional[int] = 114,
-convert_strl: Optional[Sequence[Name]] = None,
-compression: str = "infer",
-storage_options: Optional[str] = None,
-value_labels: Optional[Dict] = None,
-) -> None:
-"""
-Export DataFrame object to Stata dta format.
-
-.. note:: This method should only be used if the resulting DataFrame 
is expected
-  to be small, as all the data is loaded into the driver's 
memory.
-
-.. versionadded:: 4.0.0
-
-Parameters
---
-path : str, path object, or buffer
-String, path object (implementing ``os.PathLike[str]``), or 
file-like
-object implementing a binary ``write()`` function.
-convert_dates : dict
-Dictionary mapping columns containing datetime types to stata
-internal format to use when writing the dates. Options are 'tc',
-'td', 'tm', 'tw', 'th', 'tq', 'ty'. Column can be either an integer
-or a name. Datetime columns that do not have a conversion type
-specified will be converted to 'tc'. Raises NotImplementedError if
-a datetime column has timezone information.
-write_index : bool
-Write the index to Stata dataset.
-byteorder : str
-Can be ">", "<", "little", or "big". default is `sys.byteorder`.
-time_stamp : datetime
-A datetime to use as file creation date.  Default is the current
-time.
-data_label : str, optional
-A label for the data set.  Must be 80 characters or smaller.
-variable_labels : dict
-Dictionary containing columns as keys and variable labels as
-values. Each label must be 80 characters or smaller.
-version : {{114, 117, 118, 119, None}}, default 114
-Version to use in the output dta file. Set to None to let pandas
-decide between 118 or 119 formats depending on the number of
-columns in the frame. Version 114 can be read by Stata 10 and
-later. Version 117 can be read by Stata 13 or later. Version 118
-is supported in Stata 14 and later. Version 119 is supported in
-Stata 15 and later. Version 114 limits string variables to 244
-characters or fewer while versions 117 and later allow strings
-with lengths up to 2,000,000 characters. Versions 118 and 119
-support Unicode characters, and version 119 supports more than
-32,767 variables.
-convert_strl : list, optional
-List of column names to convert to string columns to Stata StrL
-format. Only available if version is 117.  Storing strings in the
-StrL format can produce smaller dta files if strings have more than
-8 characters and values are repeated.
-value_labels : dict of dicts
-Dictionary containing columns as keys and 

Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-06 Thread via GitHub


srowen commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1480801713


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   No. The point of Big decimal is that you never have to use double or float. 
This isn't actually doing math with BD



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45527][CORE] Use fraction to do the resource calculation [spark]

2024-02-06 Thread via GitHub


wbo4958 commented on code in PR #44690:
URL: https://github.com/apache/spark/pull/44690#discussion_r1480798101


##
core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala:
##
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.
+   *
+   * scala var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala for (i - 1 to 9 ) {
+   * |   if (total = taskAmount) {
+   * |   total -= taskAmount
+   * |   println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |   println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0. for task 1, total left: 0.
+   * assign 0. for task 2, total left: 0.
+   * assign 0. for task 3, total left: 0.6665
+   * assign 0. for task 4, total left: 0.5554
+   * assign 0. for task 5, total left: 0.44425
+   * assign 0. for task 6, total left: 0.33315
+   * assign 0. for task 7, total left: 0.22204
+   * assign 0. for task 8, total left: 0.11094
+   * ERROR Can't assign 0. for task 9, total left: 
0.11094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 1L

Review Comment:
   Hi @srowen,
   
   The **using Long** way is quite the same with the BigDecimal. The default 
scale of BigDecimal is 16, so this PR chooses (ONE_ENTIRE_RESOURCE  = 
1E16.toLong)
   
   ``` scala
   scala> val ONE_ENTIRE_RESOURCE: Long = 1E16.toLong
| val taskAmount = 0.3334
| 
| val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
| 
| val bigDec = BigDecimal(taskAmount).toDouble
   val ONE_ENTIRE_RESOURCE: Long = 1
   val taskAmount: Double = 0.3334
   val usingLong: Long = 3334
   val bigDec: Double = 0.3334
   ``` 
   
   So if we need to ensure the input is small enough (<1/n) and we can set the 
scale to be like 14 for BigDecimal, and similarly, to keep align with 
BigDecimal, we can set `ONE_ENTIRE_RESOURCE  = 1E14.toLong`
   
   ``` scala
   scala> import scala.math.BigDecimal.RoundingMode
| 
| val ONE_ENTIRE_RESOURCE: Long = 1E14.toLong
| val taskAmount = 0.3334
| 
| val usingLong = (taskAmount * ONE_ENTIRE_RESOURCE).toLong
| 
| val bigDec = BigDecimal(taskAmount).setScale(14, 
RoundingMode.DOWN).toDouble
   import scala.math.BigDecimal.RoundingMode
   val ONE_ENTIRE_RESOURCE: Long = 100
   val taskAmount: Double = 0.3334
   val usingLong: Long = 33
   val bigDec: Double = 0.33
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-06 Thread via GitHub


HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797674


##
python/pyspark/sql/pandas/map_ops.py:
##
@@ -15,9 +15,14 @@
 # limitations under the License.
 #
 import sys
-from typing import Union, TYPE_CHECKING
+from typing import Union, TYPE_CHECKING, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pyspark.resource.requests import ExecutorResourceRequests, 
TaskResourceRequests
 

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-06 Thread via GitHub


HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797398


##
python/pyspark/sql/tests/test_resources.py:
##
@@ -0,0 +1,104 @@
+#

Review Comment:
   This has to be added into `dev/sparktestsupport/modules.py`



##
python/pyspark/sql/tests/test_resources.py:
##
@@ -0,0 +1,104 @@
+#

Review Comment:
   This has to be added into `dev/sparktestsupport/modules.py`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [spark]

2024-02-06 Thread via GitHub


HyukjinKwon commented on code in PR #44852:
URL: https://github.com/apache/spark/pull/44852#discussion_r1480797018


##
python/pyspark/sql/pandas/map_ops.py:
##
@@ -175,6 +196,11 @@ def mapInArrow(
 
 .. versionadded: 3.5.0
 
+profile : :class:`pyspark.resource.ResourceProfile`. The optional 
ResourceProfile
+to be used for mapInPandas.

Review Comment:
   ```suggestion
   to be used for mapInArrow.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-02-06 Thread via GitHub


beliefer commented on code in PR #43473:
URL: https://github.com/apache/spark/pull/43473#discussion_r1480770423


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -115,6 +115,49 @@ class SparkConnectPlanner(
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
+  // Some relation transform need to create Dataset, then get the logical plan 
from the Dataset.
+  // This method used to reuse the Dataset instead to discard it.
+  def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = {

Review Comment:
   > TBH I want to move away from constructing Datasets wholesale. In many 
cases there is no real need, and it is also expensive to do.
   
   Yes. the current implementation create man duplicate datasets and then 
discard them. We should reuse these datasets and reduce the overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-02-06 Thread via GitHub


beliefer commented on code in PR #43473:
URL: https://github.com/apache/spark/pull/43473#discussion_r1480770588


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -115,6 +115,49 @@ class SparkConnectPlanner(
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
+  // Some relation transform need to create Dataset, then get the logical plan 
from the Dataset.
+  // This method used to reuse the Dataset instead to discard it.
+  def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = {

Review Comment:
   > Is this something you want to work on?
   
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-02-06 Thread via GitHub


beliefer commented on code in PR #43473:
URL: https://github.com/apache/spark/pull/43473#discussion_r1480770423


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -115,6 +115,49 @@ class SparkConnectPlanner(
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
+  // Some relation transform need to create Dataset, then get the logical plan 
from the Dataset.
+  // This method used to reuse the Dataset instead to discard it.
+  def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = {

Review Comment:
   > TBH I want to move away from constructing Datasets wholesale. In many 
cases there is no real need, and it is also expensive to do.
   
   Yes. the current implementation create man duplicate datasets. We should 
reuse these datasets and reduce the overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46963] Verify AQE is not enabled for Structured Streaming [spark]

2024-02-06 Thread via GitHub


bogao007 commented on PR #45005:
URL: https://github.com/apache/spark/pull/45005#issuecomment-1931071420

   Closing this PR, will not do the change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46963] Verify AQE is not enabled for Structured Streaming [spark]

2024-02-06 Thread via GitHub


bogao007 closed pull request #45005: [SPARK-46963] Verify AQE is not enabled 
for Structured Streaming
URL: https://github.com/apache/spark/pull/45005


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]

2024-02-06 Thread via GitHub


HyukjinKwon closed pull request #45042: [SPARK-46984][PYTHON] Remove 
pyspark.copy_func
URL: https://github.com/apache/spark/pull/45042


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]

2024-02-06 Thread via GitHub


HyukjinKwon commented on PR #45042:
URL: https://github.com/apache/spark/pull/45042#issuecomment-1931067075

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480751365


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Yes, #43627 already requires a API change, so this just built on top of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Ack. I also pinged on #43627 . If this is broken already at Spark 4.0.0, we 
don't need to care much.
   - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Ack. I also pinged on #43627 . If this was broken already at Spark 4.0.0, we 
don't need to care much.
   - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480673515


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##
@@ -222,6 +232,31 @@ class StateStoreChangelogWriterV2(
 size += 1
   }
 
+  override def merge(key: Array[Byte], value: Array[Byte]): Unit = {

Review Comment:
   Thanks for bringing this up. I have a follow up question here - should we 
also support default column family in changelog v2? Is there a reason to 
restrict it to only column families? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480674682


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala:
##
@@ -57,7 +57,10 @@ package object state {
 sessionState: SessionState,
 storeCoordinator: Option[StateStoreCoordinatorRef],
 useColumnFamilies: Boolean = false,
-extraOptions: Map[String, String] = Map.empty)(
+extraOptions: Map[String, String] = Map.empty,
+// TODO: refactor using the boolean parameter for choosing stateful 
encoder properties

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


sunchao commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1930961472

   I'll add some tests later. Marking as a draft for now to run through all 
existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480670318


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Hmm actually it might be a bit difficult since we are changing 
`memoryManager` from a `val` to a method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480670123


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -215,7 +253,13 @@ private[sql] class RocksDBStateStoreProvider
   (keySchema.length > numColsPrefixKey), "The number of columns in the key 
must be " +
   "greater than the number of columns for prefix key!")
 
-this.encoder = RocksDBStateEncoder.getEncoder(keySchema, valueSchema, 
numColsPrefixKey)
+if (useMultipleValuesPerKey) {

Review Comment:
   Yes, we have the option to get rid of this check post the key/value encoder 
refactoring.
   
   We can keep it if we want to support multiple values for a single key in 
default column family. It would make the supported operations consistent b/w 
default and other column families. But it can be added later, I am not strongly 
opinionated on this atm. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480668776


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Sure. I'm pretty much doing something very similar to this PR 
https://github.com/apache/spark/pull/43627 but happy to add a new constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480667215


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##
@@ -249,4 +259,109 @@ class NoPrefixKeyStateEncoder(keySchema: StructType, 
valueSchema: StructType)
   override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
 throw new IllegalStateException("This encoder doesn't support prefix key!")
   }
+
+  override def supportsMultipleValuesPerKey: Boolean = false
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+throw new UnsupportedOperationException("encoder does not support multiple 
values per key")
+  }
+}
+
+/**
+ * Supports encoding multiple values per key in RocksDB.
+ * A single value is encoded in the format below, where first value is number 
of bytes
+ * in actual encodedUnsafeRow followed by the encoded value itself.
+ *
+ * |---size(bytes)--|--unsafeRowEncodedBytes--|
+ *
+ * Multiple values are separated by a delimiter character.
+ *
+ * This encoder supports RocksDB StringAppendOperator merge operator. Values 
encoded can be
+ * merged in RocksDB using merge operation, and all merged values can be read 
using decodeValues
+ * operation.
+ */
+class MultiValuedStateEncoder(keySchema: StructType, valueSchema: StructType)
+  extends RocksDBStateEncoder with Logging {
+
+  import RocksDBStateEncoder._
+
+  // Reusable objects
+  private val keyRow = new UnsafeRow(keySchema.size)
+  private val valueRow = new UnsafeRow(valueSchema.size)
+  private val rowTuple = new UnsafeRowPair()
+
+  override def supportPrefixKeyScan: Boolean = false
+
+  override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+encodeUnsafeRow(row)
+  }
+
+  override def encodeValue(row: UnsafeRow): Array[Byte] = {
+val bytes = encodeUnsafeRow(row)
+val numBytes = bytes.length
+
+val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length)
+Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, numBytes)
+Platform.copyMemory(bytes, Platform.BYTE_ARRAY_OFFSET,
+  encodedBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, 
bytes.length)
+
+encodedBytes
+  }
+
+  override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+decodeToUnsafeRow(keyBytes, keyRow)
+  }
+
+  override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = {
+if (valueBytes == null) {
+  null
+} else {
+  val numBytes = Platform.getInt(valueBytes, Platform.BYTE_ARRAY_OFFSET)
+  val encodedValue = new Array[Byte](numBytes)
+  Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + 
Platform.BYTE_ARRAY_OFFSET,
+encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+  decodeToUnsafeRow(encodedValue, valueRow)
+}
+  }
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+if (valueBytes == null) {
+  Seq().iterator
+} else {
+  new Iterator[UnsafeRow] {
+private var pos: Int = Platform.BYTE_ARRAY_OFFSET
+private val maxPos = Platform.BYTE_ARRAY_OFFSET + valueBytes.length
+
+override def hasNext: Boolean = {
+  pos < maxPos
+}
+
+override def next(): UnsafeRow = {
+  val numBytes = Platform.getInt(valueBytes, pos)
+
+  pos += java.lang.Integer.BYTES
+  val encodedValue = new Array[Byte](numBytes)
+  Platform.copyMemory(valueBytes, pos,
+encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+
+  pos += numBytes
+  pos += 1 // eat the delimiter character
+  decodeToUnsafeRow(encodedValue, valueRow)
+}
+  }
+}
+  }
+  override def supportsMultipleValuesPerKey: Boolean = true

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480666905


##
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##
@@ -67,7 +67,6 @@ class SparkEnv (
 val blockManager: BlockManager,
 val securityManager: SecurityManager,
 val metricsSystem: MetricsSystem,
-val memoryManager: MemoryManager,

Review Comment:
   Could you avoid this breaking change by adding a new constructor, @sunchao ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r148084


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],
+colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit 
= {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480666519


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],

Review Comment:
   Yes, thanks for catching. Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480666428


##
core/src/main/scala/org/apache/spark/SparkContext.scala:
##
@@ -578,6 +578,8 @@ class SparkContext(config: SparkConf) extends Logging {
 // Initialize any plugins before the task scheduler is initialized.
 _plugins = PluginContainer(this, _resources.asJava)
 _env.initializeShuffleManager()
+_env.initializeMemoryManager(SparkContext.numDriverCores(master, conf))
+

Review Comment:
   nit. extra empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480665787


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],
+colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit 
= {
+verifyColFamilyExists(colFamilyName)
+
+if (conf.trackTotalNumberOfRows) {
+  val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+  if (oldValue == null) {
+numKeysOnWritingVersion += 1
+  }
+}
+db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+
+if (useColumnFamilies) {
+  changelogWriter.foreach(_.merge(key, value, colFamilyName))
+} else {
+  changelogWriter.foreach(_.merge(key, value))

Review Comment:
   Yea lets support it for default col family - that could be useful in the 
future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480664465


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],
+colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit 
= {
+verifyColFamilyExists(colFamilyName)
+
+if (conf.trackTotalNumberOfRows) {
+  val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+  if (oldValue == null) {
+numKeysOnWritingVersion += 1
+  }
+}
+db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+
+if (useColumnFamilies) {
+  changelogWriter.foreach(_.merge(key, value, colFamilyName))
+} else {
+  changelogWriter.foreach(_.merge(key, value))

Review Comment:
   I agree we probably won't use it in near future, but I don't see a reason to 
restrict it only to column families. The changes work as expected, and can be 
used for `default` column family in future if need arises. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480662135


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.commons.lang3.SerializationUtils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.streaming.state.StateStoreErrors
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+/**
+ * Helper object providing APIs to encodes the grouping key, and user provided 
values
+ * to Spark [[UnsafeRow]].
+ */
+object StateTypesEncoderUtils {
+
+  // TODO: validate places that are trying to encode the key and check if we 
can eliminate/
+  // add caching for some of these calls.
+  def encodeGroupingKey(stateName: String, keyExprEnc: 
ExpressionEncoder[Any]): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (keyOption.isEmpty) {
+  throw StateStoreErrors.implicitKeyNotFound(stateName)
+}
+
+val toRow = keyExprEnc.createSerializer()
+val keyByteArr = toRow
+  .apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+
+val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)

Review Comment:
   Done. I think class private val is okay. We don't expect them to change 
across State variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

2024-02-06 Thread via GitHub


sunchao opened a new pull request, #45052:
URL: https://github.com/apache/spark/pull/45052

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46913][WIP] Add support for processing/event time based timers with transformWithState operator [spark]

2024-02-06 Thread via GitHub


anishshri-db opened a new pull request, #45051:
URL: https://github.com/apache/spark/pull/45051

   ### What changes were proposed in this pull request?
   Add support for processing/event time based timers with transformWithState 
operator
   
   ### Why are the changes needed?
   Changes are required to add event-driven timer based support for stateful 
streaming applications based on state api v2
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   
   ### How was this patch tested?
   Added unit tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Implement python worker to run python streaming data source [spark]

2024-02-06 Thread via GitHub


allisonwang-db commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1480622631


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala:
##
@@ -45,13 +63,50 @@ class PythonScan(
 new PythonPartitionReaderFactory(
   ds.source, readerFunc, outputSchema, jobArtifactUUID)
   }
+}
 
-  override def toBatch: Batch = this
+case class PythonStreamingSourceOffset(json: String) extends Offset
 
-  override def description: String = "(Python)"
+case class PythonStreamingSourcePartition(partition: Array[Byte]) extends 
InputPartition
 
-  override def readSchema(): StructType = outputSchema
+class PythonMicroBatchStream(

Review Comment:
   should we put this in a different file? 



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val partitionsFuncId = 886
+  val latestOffsetsFuncId = 887

Review Comment:
   what are these hardcoded numbers?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/python/PythonScan.scala:
##
@@ -27,9 +30,24 @@ class PythonScan(
  ds: PythonDataSourceV2,
  shortName: String,
  outputSchema: StructType,
- options: CaseInsensitiveStringMap) extends Batch with Scan {
+ options: CaseInsensitiveStringMap) extends Scan {
 
-  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+  override def toBatch: Batch = new PythonBatch(ds, shortName, outputSchema, 
options)
+
+  override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream =
+new PythonMicroBatchStream(ds, shortName, outputSchema, options)
+
+  override def description: String = "(Python)"
+
+  override def readSchema(): StructType = outputSchema
+
+  override def supportedCustomMetrics(): Array[CustomMetric] =
+ds.source.createPythonMetrics()
+}
+
+class PythonBatch(ds: PythonDataSourceV2, shortName: String,
+  outputSchema: StructType, options: CaseInsensitiveStringMap) 
extends Batch {

Review Comment:
   nit: ident



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import 

Re: [PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python data source streaming write [spark]

2024-02-06 Thread via GitHub


xinrong-meng commented on PR #45049:
URL: https://github.com/apache/spark/pull/45049#issuecomment-1930884879

   Would you create a Spark JIRA https://issues.apache.org/jira/browse/SPARK 
and add it to the PR title? Please refer to 
https://spark.apache.org/contributing.html for details. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46989][SQL][CONNECT] Improve concurrency performance for SparkSession [spark]

2024-02-06 Thread via GitHub


ueshin commented on code in PR #45046:
URL: https://github.com/apache/spark/pull/45046#discussion_r1480617495


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -928,10 +929,10 @@ object SparkSession extends Logging {
  *
  * @since 3.5.0
  */
-def config(map: Map[String, Any]): Builder = synchronized {
+def config(map: Map[String, Any]): Builder = {
   map.foreach { kv: (String, Any) =>
 {
-  options += kv._1 -> kv._2.toString
+  options.put(kv._1, kv._2.toString)

Review Comment:
   This change seems to be a breaking change as another thread can read/write 
between each `put`, whereas can't before? How about using `putAll` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46984][PYTHON] Remove pyspark.copy_func [spark]

2024-02-06 Thread via GitHub


xinrong-meng commented on PR #45042:
URL: https://github.com/apache/spark/pull/45042#issuecomment-1930881092

   LGTM once tests passed, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46987][CONNECT] `ProtoUtils.abbreviate` avoid unnecessary `setField` operation [spark]

2024-02-06 Thread via GitHub


xinrong-meng commented on PR #45045:
URL: https://github.com/apache/spark/pull/45045#issuecomment-193088

   LGTM once tests passed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480608319


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   I thought it was good enough as long as it didn't initialize per partition, 
and once per executor was okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480600742


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
 metrics // force lazy init at driver
 
-child.execute().mapPartitionsWithStateStore[InternalRow](
-  getStateInfo,
-  schemaForKeyRow,
-  schemaForValueRow,
-  numColsPrefixKey = 0,
-  session.sqlContext.sessionState,
-  Some(session.sqlContext.streams.stateStoreCoordinator),
-  useColumnFamilies = true
-) {
-  case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-val processorHandle = new StatefulProcessorHandleImpl(store, 
getStateInfo.queryRunId,
-  keyEncoder)
-assert(processorHandle.getHandleState == 
StatefulProcessorHandleState.CREATED)
-statefulProcessor.init(processorHandle, outputMode)
-
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-val result = processDataWithPartition(singleIterator, store, 
processorHandle)
-result
+if (isStreaming) {
+  child.execute().mapPartitionsWithStateStore[InternalRow](
+getStateInfo,
+schemaForKeyRow,
+schemaForValueRow,
+numColsPrefixKey = 0,
+session.sqlContext.sessionState,
+Some(session.sqlContext.streams.stateStoreCoordinator),
+useColumnFamilies = true
+  ) {
+case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+  processData(store, singleIterator)
+  }
+} else {
+  // If the query is running in batch mode, we need to create a new 
StateStore and instantiate
+  // a temp directory on the executors in mapPartitionsWithIndex.
+  child.execute().mapPartitionsWithIndex[InternalRow](
+(i, iter) => {
+  val providerId = {
+// lazy creation to initialize tempDirPath once
+lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   I don't think this will evaluate once. But do we need it to evaluate once ? 
I thought we were fine passing a tmp path to each executor here, given that the 
store instance is not tracked/checkpointed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46689][SPARK-46690][PYTHON][CONNECT] Support v2 profiling in group/cogroup applyInPandas [spark]

2024-02-06 Thread via GitHub


xinrong-meng opened a new pull request, #45050:
URL: https://github.com/apache/spark/pull/45050

   ### What changes were proposed in this pull request?
   Support v2 (perf, memory) profiling in group/cogroup applyInPandas, which 
rely on physical plan nodes FlatMapGroupsInBatchExec and 
FlatMapCoGroupsInBatchExec.
   
   ### Why are the changes needed?
   Complete v2 profiling support.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes. V2 profiling in group/cogroup applyInPandas is supported.
   
   ### How was this patch tested?
   Unit tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480592060


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider
   value
 }
 
+override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
+  verify(key != null, "Key cannot be null")
+  verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a 
encoder " +
+  "that supports multiple values for a single key.")
+  val valueIterator = 
encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName))
+
+  if (!isValidated && valueIterator.nonEmpty) {

Review Comment:
   hmm - why do we have this check ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480590932


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider
   value
 }
 
+override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
+  verify(key != null, "Key cannot be null")
+  verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a 
encoder " +
+  "that supports multiple values for a single key.")
+  val valueIterator = 
encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName))
+
+  if (!isValidated && valueIterator.nonEmpty) {
+new Iterator[UnsafeRow] {
+  override def hasNext: Boolean = {
+valueIterator.hasNext
+  }
+
+  override def next(): UnsafeRow = {
+val value = valueIterator.next()
+if (!isValidated && value != null) {

Review Comment:
   We should probably skip this if `useColumnFamilies` is enabled ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480590125


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputRow(key: String, action: String, value: String)
+
+class TestListStateProcessor
+  extends StatefulProcessor[String, InputRow, (String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _listState: ListState[String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): 
Unit = {
+_processorHandle = handle
+_listState = handle.getListState("testListState")
+  }
+
+  override def handleInputRows(key: String,
+  rows: Iterator[InputRow],
+  timerValues: TimerValues): Iterator[(String, String)] = {
+
+var output = List[(String, String)]()
+
+for (row <- rows) {
+  if (row.action == "emit") {
+output = (key, row.value) :: output
+  } else if (row.action == "emitAllInState") {
+_listState.get().foreach(v => {
+  output = (key, v) :: output
+})
+_listState.remove()
+  } else if (row.action == "append") {
+_listState.appendValue(row.value)
+  } else if (row.action == "appendAll") {
+_listState.appendList(row.value.split(","))
+  } else if (row.action == "put") {
+_listState.put(row.value.split(","))
+  } else if (row.action == "remove") {
+_listState.remove()
+  } else if (row.action == "tryAppendingNull") {
+_listState.appendValue(null)
+  } else if (row.action == "tryAppendingNullValueInList") {
+_listState.appendList(Array(null))
+  } else if (row.action == "tryAppendingNullList") {
+_listState.appendList(null)
+  } else if (row.action == "tryPutNullList") {
+_listState.put(null)
+  } else if (row.action == "tryPuttingNullInList") {
+_listState.put(Array(null))
+  }
+}
+
+output.iterator
+  }
+
+
+  override def close(): Unit = {
+  }
+}
+
+class TransformWithListStateSuite extends StreamTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
+  import testImplicits._
+
+  test("test appending null value in list state throw exception") {
+withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+  classOf[RocksDBStateStoreProvider].getName) {
+
+  val inputData = MemoryStream[InputRow]
+  val result = inputData.toDS()
+.groupByKey(x => x.key)
+.transformWithState(new TestListStateProcessor(),
+  TimeoutMode.NoTimeouts(),
+  OutputMode.Update())
+
+  testStream(result, OutputMode.Update()) (
+AddData(inputData, InputRow("k1", "tryAppendingNull", "")),
+ExpectFailure[SparkException](e => {
+  assert(e.getMessage.contains("CANNOT_WRITE_STATE_STORE.NULL_VALUE"))
+})
+  )
+}
+  }
+
+  test("test putting null value in list state throw exception") {
+withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+  classOf[RocksDBStateStoreProvider].getName) {
+
+  val inputData = MemoryStream[InputRow]
+  val result = inputData.toDS()
+.groupByKey(x => x.key)
+.transformWithState(new TestListStateProcessor(),
+  TimeoutMode.NoTimeouts(),
+  OutputMode.Update())
+
+  testStream(result, OutputMode.Update())(
+AddData(inputData, InputRow("k1", "tryPuttingNullInList", "")),
+ExpectFailure[SparkException](e => {
+  assert(e.getMessage.contains("CANNOT_WRITE_STATE_STORE.NULL_VALUE"))
+})
+  )
+}
+  }
+
+  test("test putting null list in list state throw exception") {
+withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+  classOf[RocksDBStateStoreProvider].getName) {
+
+  val inputData = 

Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480589830


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputRow(key: String, action: String, value: String)
+
+class TestListStateProcessor
+  extends StatefulProcessor[String, InputRow, (String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _listState: ListState[String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): 
Unit = {
+_processorHandle = handle
+_listState = handle.getListState("testListState")
+  }
+
+  override def handleInputRows(key: String,
+  rows: Iterator[InputRow],
+  timerValues: TimerValues): Iterator[(String, String)] = {
+
+var output = List[(String, String)]()
+
+for (row <- rows) {
+  if (row.action == "emit") {
+output = (key, row.value) :: output
+  } else if (row.action == "emitAllInState") {
+_listState.get().foreach(v => {
+  output = (key, v) :: output
+})
+_listState.remove()
+  } else if (row.action == "append") {
+_listState.appendValue(row.value)
+  } else if (row.action == "appendAll") {
+_listState.appendList(row.value.split(","))
+  } else if (row.action == "put") {
+_listState.put(row.value.split(","))
+  } else if (row.action == "remove") {
+_listState.remove()
+  } else if (row.action == "tryAppendingNull") {
+_listState.appendValue(null)
+  } else if (row.action == "tryAppendingNullValueInList") {
+_listState.appendList(Array(null))
+  } else if (row.action == "tryAppendingNullList") {
+_listState.appendList(null)
+  } else if (row.action == "tryPutNullList") {
+_listState.put(null)
+  } else if (row.action == "tryPuttingNullInList") {
+_listState.put(Array(null))
+  }
+}
+
+output.iterator
+  }
+
+
+  override def close(): Unit = {
+  }
+}
+
+class TransformWithListStateSuite extends StreamTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
+  import testImplicits._
+
+  test("test appending null value in list state throw exception") {

Review Comment:
   Should we move these tests to a separate suite such as 
`TransformWithListStateValidationSuite` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480588124


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputRow(key: String, action: String, value: String)
+
+class TestListStateProcessor
+  extends StatefulProcessor[String, InputRow, (String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _listState: ListState[String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): 
Unit = {
+_processorHandle = handle
+_listState = handle.getListState("testListState")
+  }
+
+  override def handleInputRows(key: String,
+  rows: Iterator[InputRow],
+  timerValues: TimerValues): Iterator[(String, String)] = {
+
+var output = List[(String, String)]()
+
+for (row <- rows) {
+  if (row.action == "emit") {
+output = (key, row.value) :: output
+  } else if (row.action == "emitAllInState") {
+_listState.get().foreach(v => {
+  output = (key, v) :: output
+})
+_listState.remove()
+  } else if (row.action == "append") {
+_listState.appendValue(row.value)
+  } else if (row.action == "appendAll") {
+_listState.appendList(row.value.split(","))
+  } else if (row.action == "put") {
+_listState.put(row.value.split(","))
+  } else if (row.action == "remove") {
+_listState.remove()
+  } else if (row.action == "tryAppendingNull") {
+_listState.appendValue(null)
+  } else if (row.action == "tryAppendingNullValueInList") {
+_listState.appendList(Array(null))
+  } else if (row.action == "tryAppendingNullList") {
+_listState.appendList(null)
+  } else if (row.action == "tryPutNullList") {
+_listState.put(null)
+  } else if (row.action == "tryPuttingNullInList") {
+_listState.put(Array(null))
+  }
+}
+
+output.iterator
+  }
+
+

Review Comment:
   Nit: extra newline ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480587660


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -845,6 +859,96 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  testWithChangelogCheckpointingEnabled("ensure merge operation is not 
supported" +
+" with changelog checkpoint if column families is not enabled") {
+withTempDir { dir =>
+  val remoteDir = Utils.createTempDir().toString
+  val conf = dbConf.copy(minDeltasForSnapshot = 5, compactOnCommit = false)
+  new File(remoteDir).delete() // to make sure that the directory gets 
created
+  withDB(remoteDir, conf = conf, useColumnFamilies = false) { db =>
+db.load(0)
+db.put("a", "1")
+intercept[UnsupportedOperationException](
+  db.merge("a", "2")
+)
+  }
+}
+  }
+
+  testWithChangelogCheckpointingDisabled("ensure merge operation is supported" 
+
+" without changelog checkpoint if column families is not enabled") {

Review Comment:
   Hmm - why do we have these tests ? Which case are we trying to cover here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480586073


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala:
##
@@ -57,7 +57,10 @@ package object state {
 sessionState: SessionState,
 storeCoordinator: Option[StateStoreCoordinatorRef],
 useColumnFamilies: Boolean = false,
-extraOptions: Map[String, String] = Map.empty)(
+extraOptions: Map[String, String] = Map.empty,
+// TODO: refactor using the boolean parameter for choosing stateful 
encoder properties

Review Comment:
   Should we remove this comment ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480578009


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##
@@ -222,6 +232,31 @@ class StateStoreChangelogWriterV2(
 size += 1
   }
 
+  override def merge(key: Array[Byte], value: Array[Byte]): Unit = {

Review Comment:
   This interface is not supported in both v1 and v2 writers. Should we just 
remove this then ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480576634


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -215,7 +253,13 @@ private[sql] class RocksDBStateStoreProvider
   (keySchema.length > numColsPrefixKey), "The number of columns in the key 
must be " +
   "greater than the number of columns for prefix key!")
 
-this.encoder = RocksDBStateEncoder.getEncoder(keySchema, valueSchema, 
numColsPrefixKey)
+if (useMultipleValuesPerKey) {

Review Comment:
   After my refactoring change - we can get rid of this check I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480575858


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##
@@ -249,4 +259,109 @@ class NoPrefixKeyStateEncoder(keySchema: StructType, 
valueSchema: StructType)
   override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
 throw new IllegalStateException("This encoder doesn't support prefix key!")
   }
+
+  override def supportsMultipleValuesPerKey: Boolean = false
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+throw new UnsupportedOperationException("encoder does not support multiple 
values per key")
+  }
+}
+
+/**
+ * Supports encoding multiple values per key in RocksDB.
+ * A single value is encoded in the format below, where first value is number 
of bytes
+ * in actual encodedUnsafeRow followed by the encoded value itself.
+ *
+ * |---size(bytes)--|--unsafeRowEncodedBytes--|
+ *
+ * Multiple values are separated by a delimiter character.
+ *
+ * This encoder supports RocksDB StringAppendOperator merge operator. Values 
encoded can be
+ * merged in RocksDB using merge operation, and all merged values can be read 
using decodeValues
+ * operation.
+ */
+class MultiValuedStateEncoder(keySchema: StructType, valueSchema: StructType)
+  extends RocksDBStateEncoder with Logging {
+
+  import RocksDBStateEncoder._
+
+  // Reusable objects
+  private val keyRow = new UnsafeRow(keySchema.size)
+  private val valueRow = new UnsafeRow(valueSchema.size)
+  private val rowTuple = new UnsafeRowPair()
+
+  override def supportPrefixKeyScan: Boolean = false
+
+  override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
+throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def extractPrefixKey(key: UnsafeRow): UnsafeRow = {
+throw new IllegalStateException("This encoder doesn't support prefix key!")
+  }
+
+  override def encodeKey(row: UnsafeRow): Array[Byte] = {
+encodeUnsafeRow(row)
+  }
+
+  override def encodeValue(row: UnsafeRow): Array[Byte] = {
+val bytes = encodeUnsafeRow(row)
+val numBytes = bytes.length
+
+val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length)
+Platform.putInt(encodedBytes, Platform.BYTE_ARRAY_OFFSET, numBytes)
+Platform.copyMemory(bytes, Platform.BYTE_ARRAY_OFFSET,
+  encodedBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET, 
bytes.length)
+
+encodedBytes
+  }
+
+  override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
+decodeToUnsafeRow(keyBytes, keyRow)
+  }
+
+  override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = {
+if (valueBytes == null) {
+  null
+} else {
+  val numBytes = Platform.getInt(valueBytes, Platform.BYTE_ARRAY_OFFSET)
+  val encodedValue = new Array[Byte](numBytes)
+  Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + 
Platform.BYTE_ARRAY_OFFSET,
+encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+  decodeToUnsafeRow(encodedValue, valueRow)
+}
+  }
+
+  override def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] = {
+if (valueBytes == null) {
+  Seq().iterator
+} else {
+  new Iterator[UnsafeRow] {
+private var pos: Int = Platform.BYTE_ARRAY_OFFSET
+private val maxPos = Platform.BYTE_ARRAY_OFFSET + valueBytes.length
+
+override def hasNext: Boolean = {
+  pos < maxPos
+}
+
+override def next(): UnsafeRow = {
+  val numBytes = Platform.getInt(valueBytes, pos)
+
+  pos += java.lang.Integer.BYTES
+  val encodedValue = new Array[Byte](numBytes)
+  Platform.copyMemory(valueBytes, pos,
+encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
+
+  pos += numBytes
+  pos += 1 // eat the delimiter character
+  decodeToUnsafeRow(encodedValue, valueRow)
+}
+  }
+}
+  }
+  override def supportsMultipleValuesPerKey: Boolean = true

Review Comment:
   Nit: can we add a newline here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480574813


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],
+colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit 
= {

Review Comment:
   Also maybe change the indent for all args to be 4 spaces and on new lines 
similar to functions above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480574441


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],

Review Comment:
   Could we add a function level comment here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480574178


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -316,6 +321,25 @@ class RocksDB(
 }
   }
 
+  def merge(key: Array[Byte], value: Array[Byte],
+colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit 
= {
+verifyColFamilyExists(colFamilyName)
+
+if (conf.trackTotalNumberOfRows) {
+  val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+  if (oldValue == null) {
+numKeysOnWritingVersion += 1
+  }
+}
+db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+
+if (useColumnFamilies) {
+  changelogWriter.foreach(_.merge(key, value, colFamilyName))
+} else {
+  changelogWriter.foreach(_.merge(key, value))

Review Comment:
   We want to support this for the non-col-families case too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480573652


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##
@@ -117,6 +118,7 @@ class RocksDB(
   dbOptions.setTableFormatConfig(tableFormatConfig)
   dbOptions.setMaxOpenFiles(conf.maxOpenFiles)
   dbOptions.setAllowFAllocate(conf.allowFAllocate)
+  dbOptions.setMergeOperator(new StringAppendOperator())

Review Comment:
   Oh nice - I already made this change then :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #45038:
URL: https://github.com/apache/spark/pull/45038#discussion_r1480570588


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -48,54 +50,86 @@ private[sql] class RocksDBStateStoreProvider
 
 override def version: Long = lastVersion
 
-override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+override def createColFamilyIfAbsent(
+colFamilyName: String,
+keySchema: StructType,
+numColsPrefixKey: Int,
+valueSchema: StructType): Unit = {
   verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME,
 s"Failed to create column family with reserved_name=$colFamilyName")
+  verify(useColumnFamilies, "Column families are not supported in this 
store")
   rocksDB.createColFamilyIfAbsent(colFamilyName)
+  encoderMapLock.synchronized {
+keyEncoderMap.getOrElseUpdate(colFamilyName,
+  RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey))
+
+valueEncoderMap.getOrElseUpdate(colFamilyName,
+  RocksDBStateEncoder.getValueEncoder(valueSchema))
+  }

Review Comment:
   If the entry exists, then we won't replace it actually with the current 
changes. But on restart, this can happen. However, for the col family case - we 
need to decide interaction with state schema compatibility checker anyway. So 
will prefer to do this in a separate PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #45038:
URL: https://github.com/apache/spark/pull/45038#discussion_r1480567343


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider
   useColumnFamilies)
   }
 
-  @volatile private var encoder: RocksDBStateEncoder = _
+  private val encoderMapLock = new Object
+
+  @GuardedBy("encoderMapLock")
+  @volatile private var keyEncoderMap = new mutable.HashMap[String, 
RocksDBKeyStateEncoder]
+
+  @GuardedBy("encoderMapLock")
+  @volatile private var valueEncoderMap = new mutable.HashMap[String, 
RocksDBValueStateEncoder]

Review Comment:
   Done



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider
   useColumnFamilies)
   }
 
-  @volatile private var encoder: RocksDBStateEncoder = _
+  private val encoderMapLock = new Object

Review Comment:
   Removed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480564218


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.commons.lang3.SerializationUtils
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.streaming.state.StateStoreErrors
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+/**
+ * Helper object providing APIs to encodes the grouping key, and user provided 
values
+ * to Spark [[UnsafeRow]].
+ */
+object StateTypesEncoderUtils {
+
+  // TODO: validate places that are trying to encode the key and check if we 
can eliminate/
+  // add caching for some of these calls.
+  def encodeGroupingKey(stateName: String, keyExprEnc: 
ExpressionEncoder[Any]): UnsafeRow = {
+val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+if (keyOption.isEmpty) {
+  throw StateStoreErrors.implicitKeyNotFound(stateName)
+}
+
+val toRow = keyExprEnc.createSerializer()
+val keyByteArr = toRow
+  .apply(keyOption.get).asInstanceOf[UnsafeRow].getBytes()
+
+val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)

Review Comment:
   Maybe just store as class/singleton private members once ? i wonder whether 
we should just accept this as a function argument ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480561627


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+@Experimental
+@Evolving
+/**
+ * Interface used for arbitrary stateful operations with the v2 API to capture
+ * list value state.
+ */
+private[sql] trait ListState[S] extends Serializable {
+
+  /** Whether state exists or not. */
+  def exists(): Boolean
+
+  /** Get the state value if it exists */
+  def get(): Iterator[S]
+
+  /** Get the list value as an option if it exists and None otherwise */
+  def getOption(): Option[Iterator[S]]
+
+  /** Update the value of the list. */
+  def put(newState: Array[S]): Unit
+
+  /** Append an entry to the list */
+  def appendValue(newState: S): Unit
+
+  /** Append an entire list to the existing value */
+  def appendList(newState: Array[S]): Unit
+
+  /** Remove this state. */
+  def remove(): Unit

Review Comment:
   Yes please go ahead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480557033


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+@Experimental
+@Evolving
+/**
+ * Interface used for arbitrary stateful operations with the v2 API to capture
+ * list value state.
+ */
+private[sql] trait ListState[S] extends Serializable {
+
+  /** Whether state exists or not. */
+  def exists(): Boolean
+
+  /** Get the state value if it exists */
+  def get(): Iterator[S]
+
+  /** Get the list value as an option if it exists and None otherwise */
+  def getOption(): Option[Iterator[S]]
+
+  /** Update the value of the list. */
+  def put(newState: Array[S]): Unit
+
+  /** Append an entry to the list */
+  def appendValue(newState: S): Unit
+
+  /** Append an entire list to the existing value */
+  def appendList(newState: Array[S]): Unit
+
+  /** Remove this state. */
+  def remove(): Unit

Review Comment:
   Sounds good. I think we should rename ValueState accordingly then. Do you 
see any concerns with naming `remove` to `clear` in both? If no, I will go 
ahead and make the change. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


sahnib commented on PR #44961:
URL: https://github.com/apache/spark/pull/44961#issuecomment-1930775538

   cc: @HeartSaVioR PTA, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46526][SQL] Support LIMIT over correlated subqueries where predicates only reference outer table [spark]

2024-02-06 Thread via GitHub


agubichev commented on PR #44514:
URL: https://github.com/apache/spark/pull/44514#issuecomment-1930746115

   @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python data source streaming write [spark]

2024-02-06 Thread via GitHub


chaoqin-li1123 commented on PR #45049:
URL: https://github.com/apache/spark/pull/45049#issuecomment-1930626019

   @HyukjinKwon @HeartSaVioR @allisonwang-db PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [MINOR][PYTHON] refactor PythonWrite to prepare for supporting python source streaming write [spark]

2024-02-06 Thread via GitHub


chaoqin-li1123 opened a new pull request, #45049:
URL: https://github.com/apache/spark/pull/45049

   
   
   ### What changes were proposed in this pull request?
   Move PythonBatchWrite out of PythonWrite. 
   
   
   ### Why are the changes needed?
   This is to prepare for supporting python data source streaming write in the 
future.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Trivial code refactoring, existing test sufficient.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46688][SPARK-46691][PYTHON][CONNECT] Support v2 profiling in aggregate Pandas UDFs [spark]

2024-02-06 Thread via GitHub


xinrong-meng commented on PR #45035:
URL: https://github.com/apache/spark/pull/45035#issuecomment-1930607059

   Test failure 
   ```
   FAIL [0.468s]: test_shuffle_data_with_multiple_locations 
(pyspark.tests.test_shuffle.MergerTests)
   --
   Traceback (most recent call last):
 File "/__w/spark/spark/python/pyspark/tests/test_shuffle.py", line 84, in 
test_shuffle_data_with_multiple_locations
   self.assertTrue(
   AssertionError: False is not true
   ```
   is irrelevant to the PR. Let me retrigger the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #45038:
URL: https://github.com/apache/spark/pull/45038#discussion_r1480430719


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -48,54 +50,86 @@ private[sql] class RocksDBStateStoreProvider
 
 override def version: Long = lastVersion
 
-override def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+override def createColFamilyIfAbsent(
+colFamilyName: String,
+keySchema: StructType,
+numColsPrefixKey: Int,
+valueSchema: StructType): Unit = {
   verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME,
 s"Failed to create column family with reserved_name=$colFamilyName")
+  verify(useColumnFamilies, "Column families are not supported in this 
store")
   rocksDB.createColFamilyIfAbsent(colFamilyName)
+  encoderMapLock.synchronized {
+keyEncoderMap.getOrElseUpdate(colFamilyName,
+  RocksDBStateEncoder.getKeyEncoder(keySchema, numColsPrefixKey))
+
+valueEncoderMap.getOrElseUpdate(colFamilyName,
+  RocksDBStateEncoder.getValueEncoder(valueSchema))
+  }

Review Comment:
   Should we throw an exception if the passed key/value schema are now 
different for a existing column family (which was previously created). 
   Consider the scenario below:
   
   1. User creates a colFamily with keySchema K, valueSchema V. 
   2. User issues the call again with keySchema K1, valueSchema V1. Note K1 != 
K, or V1 != V.
   3.  Now the call in (2) succeeds, but encoders are using a different schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #45038:
URL: https://github.com/apache/spark/pull/45038#discussion_r1480427117


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider
   useColumnFamilies)
   }
 
-  @volatile private var encoder: RocksDBStateEncoder = _
+  private val encoderMapLock = new Object

Review Comment:
   Not sure I understand - we would still have to protect the same hashMap 
right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]

2024-02-06 Thread via GitHub


sahnib commented on code in PR #45038:
URL: https://github.com/apache/spark/pull/45038#discussion_r1480416349


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider
   useColumnFamilies)
   }
 
-  @volatile private var encoder: RocksDBStateEncoder = _
+  private val encoderMapLock = new Object

Review Comment:
   [nit] rename this to `columnFamilyEncoderMapLock` as it only applies to 
column families. Encoders for default column family are picked at State store 
init.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -287,7 +332,13 @@ private[sql] class RocksDBStateStoreProvider
   useColumnFamilies)
   }
 
-  @volatile private var encoder: RocksDBStateEncoder = _
+  private val encoderMapLock = new Object
+
+  @GuardedBy("encoderMapLock")
+  @volatile private var keyEncoderMap = new mutable.HashMap[String, 
RocksDBKeyStateEncoder]
+
+  @GuardedBy("encoderMapLock")
+  @volatile private var valueEncoderMap = new mutable.HashMap[String, 
RocksDBValueStateEncoder]

Review Comment:
   Can we just use a concurrentHashMap with signature `Map[String, 
(RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]`. Then, we don't need to 
synchronize using a lock. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-06 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1480419508


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+@Experimental
+@Evolving
+/**
+ * Interface used for arbitrary stateful operations with the v2 API to capture
+ * list value state.
+ */
+private[sql] trait ListState[S] extends Serializable {
+
+  /** Whether state exists or not. */
+  def exists(): Boolean
+
+  /** Get the state value if it exists */
+  def get(): Iterator[S]
+
+  /** Get the list value as an option if it exists and None otherwise */
+  def getOption(): Option[Iterator[S]]
+
+  /** Update the value of the list. */
+  def put(newState: Array[S]): Unit
+
+  /** Append an entry to the list */
+  def appendValue(newState: S): Unit
+
+  /** Append an entire list to the existing value */
+  def appendList(newState: Array[S]): Unit
+
+  /** Remove this state. */
+  def remove(): Unit

Review Comment:
   Should we rename this to `clear` ? I guess `remove` would be removing an 
element from the set of list values ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-02-06 Thread via GitHub


hvanhovell commented on code in PR #43473:
URL: https://github.com/apache/spark/pull/43473#discussion_r1480406839


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -115,6 +115,49 @@ class SparkConnectPlanner(
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
+  // Some relation transform need to create Dataset, then get the logical plan 
from the Dataset.
+  // This method used to reuse the Dataset instead to discard it.
+  def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = {

Review Comment:
   Is this something you want to work on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-02-06 Thread via GitHub


hvanhovell commented on code in PR #43473:
URL: https://github.com/apache/spark/pull/43473#discussion_r1480405679


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -115,6 +115,49 @@ class SparkConnectPlanner(
   private lazy val pythonExec =
 sys.env.getOrElse("PYSPARK_PYTHON", 
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3"))
 
+  // Some relation transform need to create Dataset, then get the logical plan 
from the Dataset.
+  // This method used to reuse the Dataset instead to discard it.
+  def transformRelationAsDataset(rel: proto.Relation): Dataset[Row] = {

Review Comment:
   TBH I want to move away from constructing Datasets wholesale. In many cases 
there is no real need, and it is also expensive to do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-28346][SQL] clone the query plan between analyzer, optimizer and planner [spark]

2024-02-06 Thread via GitHub


dongjoon-hyun commented on PR #25111:
URL: https://github.com/apache/spark/pull/25111#issuecomment-1930461904

   To @MasterDDT , I'd like to recommend to file an official JIRA issue. 
Otherwise, it's difficult to get any further discussion or help because this is 
too old thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >