Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
MaxGekk commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1489031135 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collationExpressions.scala: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ + +@ExpressionDescription( + usage = "_FUNC_(expr, collationName)", + examples = """ +Examples: + > SELECT COLLATION('Spark SQL' _FUNC_ 'UCS_BASIC_LCASE'); + UCS_BASIC_LCASE + """, + since = "4.0.0", + group = "string_funcs") +object CollateExpressionBuilder extends ExpressionBuilder { + override def build(funcName: String, expressions: Seq[Expression]): Expression = { +expressions match { + case Seq(e: Expression, collationExpr: Expression) => +(collationExpr.dataType, collationExpr.foldable) match { + case (StringType, true) => +val evalCollation = collationExpr.eval() +if (evalCollation == null) { + throw QueryCompilationErrors.unexpectedNullError("collation", collationExpr) +} else { + Collate(e, collationExpr.eval().toString) Review Comment: Could you re-use `evalCollation`, please. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collationExpressions.scala: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ExpressionBuilder +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types._ + +@ExpressionDescription( + usage = "_FUNC_(expr, collationName)", + examples = """ +Examples: + > SELECT COLLATION('Spark SQL' _FUNC_ 'UCS_BASIC_LCASE'); + UCS_BASIC_LCASE + """, + since = "4.0.0", + group = "string_funcs") +object CollateExpressionBuilder extends ExpressionBuilder { + override def build(funcName: String, expressions: Seq[Expression]): Expression = { +expressions match { + case Seq(e: Expression, collationExpr: Expression) => +(collationExpr.dataType, collationExpr.foldable) match { + case (StringType, true) => +val evalCollation = collationExpr.eval() +if (evalCollation == null) { + throw QueryCompilationErrors.unexpectedNullError("collation", collationExpr) +} else { + Collate(e, collationExpr.eval().toString) +} + case (StringType, false) => throw QueryCompilationErrors.nonFoldableArgumentError( +funcName, "collationName", StringType) + case (_, _) => throw QueryCompilationErrors.unexpectedInputDataTypeError( +funcName, 1, StringType, collationExpr) +} + case s => throw QueryCompilationErrors.wrongNumArgsError(funcName, Seq(2), s.length) +} + } +} + +/** + * An
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on PR #45023: URL: https://github.com/apache/spark/pull/45023#issuecomment-1943113395 Could you please check the GA build result and fix accordingly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
HeartSaVioR commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1488847231 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -82,6 +84,39 @@ class IncrementalExecution( .map(SQLConf.SHUFFLE_PARTITIONS.valueConverter) .getOrElse(sparkSession.sessionState.conf.numShufflePartitions) + private def stateCheckpointLocationExists(stateCheckpointLocation: Path): Boolean = { +val fileManager = + CheckpointFileManager.create(stateCheckpointLocation, hadoopConf) +fileManager.exists(stateCheckpointLocation) + } + + // A map of all (operatorId -> operatorName) in the state metadata + private lazy val opMapInMetadata: Map[Long, String] = { +var ret = Map.empty[Long, String] +if (stateCheckpointLocationExists(new Path(checkpointLocation))) { Review Comment: nit: let's just inline this method if it is referenced only once. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -387,8 +433,29 @@ class IncrementalExecution( rulesToCompose.reduceLeft { (ruleA, ruleB) => ruleA orElse ruleB } } +private def checkOperatorValidWithMetadata(): Unit = { + (opMapInMetadata.keySet ++ opMapInPhysicalPlan.keySet).foreach { opId => + val opInMetadata = opMapInMetadata.getOrElse(opId, "not found") + val opInCurBatch = opMapInPhysicalPlan.getOrElse(opId, "not found") + if (opInMetadata != opInCurBatch) { + throw QueryExecutionErrors.statefulOperatorNotMatchInStateMetadataError( + opMapInMetadata.values.toSeq, Review Comment: Shall we print out association between opId and opName in error message? It may be uneasy to understand what is mismatching only with opNames. ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + test("Changing operator - " + +"replace, add, remove operators will trigger error with debug message") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + + testStream(stream.dropDuplicates())( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 1), +ProcessAllAvailable(), +StopStream + ) + + def checkOpChangeError(OpsInMetadataSeq: Seq[String], + OpsInCurBatchSeq: Seq[String], Review Comment: nit: indentation is off, 4 spaces ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ## @@ -1702,6 +1702,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE new NoSuchElementException("State is either not defined or has already been removed") } + def statefulOperatorNotMatchInStateMetadataError( + opsInMetadataSeq: Seq[String], Review Comment: Likewise I commented, should have provided information about association between opId and opName. Only opNames does not seem to be sufficient. ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() +
Re: [PR] [SPARK-47036][SS] Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory [spark]
HeartSaVioR commented on code in PR #45092: URL: https://github.com/apache/spark/pull/45092#discussion_r1488831105 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -1863,6 +1864,91 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("ensure local files deleted on filesystem" + +" are cleaned from dfs file mapping") { +def getSSTFiles(dir: File): Set[File] = { + val sstFiles = new mutable.HashSet[File]() + dir.listFiles().foreach { f => +if (f.isDirectory) { + sstFiles ++= getSSTFiles(f) +} else { + if (f.getName.endsWith(".sst")) { +sstFiles.add(f) + } +} + } + sstFiles.toSet +} + +def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = { + dir.listFiles().foreach { f => +if (f.isDirectory) { + filterAndDeleteSSTFiles(f, filesToKeep) +} else { + if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) { +logInfo(s"deleting ${f.getAbsolutePath} from local directory") +f.delete() + } +} + } +} + +withTempDir { dir => + withTempDir { localDir => +val sqlConf = new SQLConf() +val dbConf = RocksDBConf(StateStoreConf(sqlConf)) +logInfo(s"config set to ${dbConf.compactOnCommit}") +val hadoopConf = new Configuration() +val remoteDir = dir.getCanonicalPath +withDB(remoteDir = remoteDir, + conf = dbConf, + hadoopConf = hadoopConf, + localDir = localDir) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + // upload snapshots (with changelog checkpointing) Review Comment: or just remove mentioning the part `with changelog checkpointing`. ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -1863,6 +1864,91 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("ensure local files deleted on filesystem" + +" are cleaned from dfs file mapping") { +def getSSTFiles(dir: File): Set[File] = { + val sstFiles = new mutable.HashSet[File]() + dir.listFiles().foreach { f => +if (f.isDirectory) { + sstFiles ++= getSSTFiles(f) +} else { + if (f.getName.endsWith(".sst")) { +sstFiles.add(f) + } +} + } + sstFiles.toSet +} + +def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = { + dir.listFiles().foreach { f => +if (f.isDirectory) { + filterAndDeleteSSTFiles(f, filesToKeep) +} else { + if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) { +logInfo(s"deleting ${f.getAbsolutePath} from local directory") +f.delete() + } +} + } +} + +withTempDir { dir => + withTempDir { localDir => +val sqlConf = new SQLConf() +val dbConf = RocksDBConf(StateStoreConf(sqlConf)) +logInfo(s"config set to ${dbConf.compactOnCommit}") +val hadoopConf = new Configuration() +val remoteDir = dir.getCanonicalPath +withDB(remoteDir = remoteDir, + conf = dbConf, + hadoopConf = hadoopConf, + localDir = localDir) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + // upload snapshots (with changelog checkpointing) Review Comment: nit: with/without - I expect this to be running with both changelog checkpointing on and off. ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -1863,6 +1864,91 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("ensure local files deleted on filesystem" + +" are cleaned from dfs file mapping") { +def getSSTFiles(dir: File): Set[File] = { + val sstFiles = new mutable.HashSet[File]() + dir.listFiles().foreach { f => +if (f.isDirectory) { + sstFiles ++= getSSTFiles(f) +} else { + if (f.getName.endsWith(".sst")) { +sstFiles.add(f) + } +} + } + sstFiles.toSet +} + +def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = { + dir.listFiles().foreach { f => +if (f.isDirectory) { + filterAndDeleteSSTFiles(f, filesToKeep) +} else { + if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) { +logInfo(s"deleting ${f.getAbsolutePath} from local directory") +f.delete() + } +} + } +} + +withTempDir { dir => + withTempDir { localDir => +val sqlConf = new
Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]
HeartSaVioR commented on PR #43210: URL: https://github.com/apache/spark/pull/43210#issuecomment-1942967890 The error message I've seen was following: ``` [autosummary] failed to import 'pyspark.ml.connect.classification.LogisticRegression': no module named pyspark.ml.connect.classification.LogisticRegression ``` But adding modules to ALL in `__init__.py` did not seem to work as I expected. Maybe I'm missing something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]
HeartSaVioR commented on PR #43210: URL: https://github.com/apache/spark/pull/43210#issuecomment-1942961137 It seems like pyspark docs build is failing due to this - during running release script against branch-3.5. I can see the docs build pass after reverting this commit. It's really odd as it has been passing in Github Action - I checked with commits in branch-3.5. I suspect the difference may come from different python/apt library versioning (docker container for release), but I have no clear idea about this. @HyukjinKwon @WeichenXu123 What'd be the better way to move forward? Shall I revert this for branch-3.5, or could someone help looking at this one and make the change be fast forward? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46820][PYTHON] Fix error message regression by restoring `new_msg` [spark]
itholic commented on code in PR #44859: URL: https://github.com/apache/spark/pull/44859#discussion_r1488780632 ## python/pyspark/sql/types.py: ## @@ -2214,12 +2211,9 @@ def verify_acceptable_types(obj: Any) -> None: # subclass of them can not be fromInternal in JVM if type(obj) not in _acceptable_types[_type]: raise PySparkTypeError( -error_class="CANNOT_ACCEPT_OBJECT_IN_TYPE", -message_parameters={ -"data_type": str(dataType), -"obj_name": str(obj), -"obj_type": type(obj).__name__, -}, Review Comment: CI finally turns to green -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46858][PYTHON][PS][BUILD] Upgrade Pandas to 2.2.0 [spark]
itholic commented on PR #44881: URL: https://github.com/apache/spark/pull/44881#issuecomment-1942942082 Yeah, Pandas fixes many bugs from Pandas 2.2.0 that brings couple of behavior changes Let me fix them. Thanks for the confirm! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942919284 You're welcome. Feel free to ping me again on this PR. I'll be here Today for support. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
jingz-db commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942918594 >BTW, in the community, we trust CIs as the ground truth. This makes sense, I am double checking. Thanks for the quick response! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942911238 BTW, in the community, we trust CIs as the ground truth. Does your GitHub Action also fail like you mentioned? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942910281 Could you clear up your Maven or Ivy cache? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
jingz-db commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942909715 I just tried `build/sbt clean package` and then `build/sbt "sql/testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite"`, it still gives the same error as above. And I am also on latest master commit: ```scala git log --oneline -n1 63b97c6ad8 (HEAD -> master, origin/master, origin/HEAD) [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942909458 Also, to @cloud-fan and @HyukjinKwon , could you double-check with @jingz-db and @chaoqin-li1123 ? I can help you if there is a reproducible example in Apache Spark master branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942908662 To @jingz-db and @chaoqin-li1123 , are you sure that you are using Apache Spark `master` instead of `Databricks` master? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942907749 I also tried the following. It succeeded like the following too. ``` $ build/sbt ... sbt:spark-parent> testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite ... [info] Run completed in 24 seconds, 532 milliseconds. [info] Total number of tests run: 21 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 21, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ... [success] Total time: 57 s, completed Feb 13, 2024, 4:36:04 PM sbt:spark-parent> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942903356 For the record, the following is the result from Apache Spark master branch. ``` $ git log --oneline -n1 63b97c6ad82 (HEAD -> master, apache/master, apache/HEAD) [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider $ build/sbt "sql/testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite" ... [info] Run completed in 21 seconds, 342 milliseconds. [info] Total number of tests run: 21 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 21, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 37 s, completed Feb 13, 2024, 4:29:57 PM ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942900495 So, something like this? ``` $ build/sbt "sql/testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
chaoqin-li1123 commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942899902 `built/sbt` to enter the scala shell, and `testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite` to run the test within the scala shell. @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942897094 Ur, a full command please, @chaoqin-li1123 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
chaoqin-li1123 commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942896374 Thanks @dongjoon-hyun My command is > build/sbt >> testOnly org.apache.spark.sql.execution.python.PythonDataSourceSuite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942895237 I can help you when you provide a reproducible procedure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942894624 Please give me a reproducible command line, @jingz-db . :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
jingz-db commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942893943 Hi @dongjoon-hyun , similar error also happens on my local env with errors below: ```scala [error] /Users/jing.zhan/spark/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala:37:29: Class org.openqa.selenium.remote.AbstractDriverOptions not found - continuing with a stub. [error] val chromeOptions = new ChromeOptions [error] ^ [error] /Users/jing.zhan/spark/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala:39:21: Class org.openqa.selenium.remote.RemoteWebDriver not found - continuing with a stub. [error] webDriver = new ChromeDriver(chromeOptions) [error] ^ [error] /Users/jing.zhan/spark/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala:39:17: type mismatch; [error] found : org.openqa.selenium.chrome.ChromeDriver [error] required: org.openqa.selenium.WebDriver [error] webDriver = new ChromeDriver(chromeOptions) [error] ^ [error] /Users/jing.zhan/spark/core/src/test/scala/org/apache/spark/ui/ChromeUISeleniumSuite.scala:38:17: type mismatch; [error] found : org.openqa.selenium.chrome.ChromeDriver [error] required: org.openqa.selenium.WebDriver with org.openqa.selenium.JavascriptExecutor [error] webDriver = new ChromeDriver(chromeOptions) [error] ^ [warn] one warning found [error] four errors found ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
ueshin commented on code in PR #45073: URL: https://github.com/apache/spark/pull/45073#discussion_r1488746220 ## python/pyspark/sql/profiler.py: ## @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults": """ ... +def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None: Review Comment: Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
dongjoon-hyun commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942893118 What is your command, @chaoqin-li1123 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44445][BUILD][TESTS] Use `org.seleniumhq.selenium.htmlunit3-driver` instead of `net.sourceforge.htmlunit` [spark]
chaoqin-li1123 commented on PR #45079: URL: https://github.com/apache/spark/pull/45079#issuecomment-1942892329 It seems that this commit break my sbt build in latest master branch The error message is `spark/core/src/test/scala/org/apache/spark/deploy/history/ChromeUIHistoryServerSuite.scala:37:29: Class org.openqa.selenium.remote.AbstractDriverOptions not found` @dongjoon-hyun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [Don't merge & review] verify sbt on master [spark]
github-actions[bot] commented on PR #43079: URL: https://github.com/apache/spark/pull/43079#issuecomment-1942891730 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45782][CORE][PYTHON] Add Dataframe API df.explainString() [spark]
github-actions[bot] closed pull request #43651: [SPARK-45782][CORE][PYTHON] Add Dataframe API df.explainString() URL: https://github.com/apache/spark/pull/43651 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SS] Add MapState implementation for State API v2. [spark]
jingz-db opened a new pull request, #45094: URL: https://github.com/apache/spark/pull/45094 ### What changes were proposed in this pull request? This PR adds changes for MapState implementation in State Api v2. This implementation adds a new encoder/decoder to encode grouping key and user key into a composite key to be put into RocksDB so that we could retrieve key-value pair by user specified user key by one rocksdb get. ### Why are the changes needed? These changes are needed to support map values in the State Store. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939 ### Does this PR introduce _any_ user-facing change? Yes This PR introduces a new state type (MapState) that users can use in their Spark streaming queries. ### How was this patch tested? Unit tests in `TransforWithMapStateSuite`. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
xinrong-meng commented on code in PR #45073: URL: https://github.com/apache/spark/pull/45073#discussion_r1488693924 ## python/pyspark/sql/profiler.py: ## @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults": """ ... +def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None: Review Comment: Good point! V1 has `sc.dump_profiles/show_profiles` for both perf and memory profiling. V2 has `spark.dumpPerfProfiles` and `spark.dumpMemoryProfiles` for perf and memory profiling separately. It would be more consistent and user-friendly to introduce a uniform interface for both like `spark.profile.dump/show`. Let me create a ticket for now. What's your thought on that @ueshin? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
xinrong-meng commented on code in PR #45073: URL: https://github.com/apache/spark/pull/45073#discussion_r1488687912 ## python/pyspark/sql/profiler.py: ## @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults": """ ... +def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None: +""" +Dump the perf profile results into directory `path`. + +.. versionadded:: 4.0.0 + +Parameters +-- +path: str +A directory in which to dump the perf profile. +id : int, optional +A UDF ID to be shown. If not specified, all the results will be shown. +""" +with self._lock: +stats = self._perf_profile_results + +def dump(path: str, id: int) -> None: +s = stats.get(id) + +if s is not None: +if not os.path.exists(path): +os.makedirs(path) +p = os.path.join(path, "udf_%d.pstats" % id) Review Comment: Makes sense! I'll reuse `f"udf_{id}_memory.txt"` for memory profiles for backward compatibility. ## python/pyspark/sql/profiler.py: ## @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults": """ ... +def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None: +""" +Dump the perf profile results into directory `path`. + +.. versionadded:: 4.0.0 + +Parameters +-- +path: str +A directory in which to dump the perf profile. +id : int, optional +A UDF ID to be shown. If not specified, all the results will be shown. +""" +with self._lock: +stats = self._perf_profile_results + +def dump(path: str, id: int) -> None: Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
xinrong-meng commented on code in PR #45073: URL: https://github.com/apache/spark/pull/45073#discussion_r1488687912 ## python/pyspark/sql/profiler.py: ## @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults": """ ... +def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None: +""" +Dump the perf profile results into directory `path`. + +.. versionadded:: 4.0.0 + +Parameters +-- +path: str +A directory in which to dump the perf profile. +id : int, optional +A UDF ID to be shown. If not specified, all the results will be shown. +""" +with self._lock: +stats = self._perf_profile_results + +def dump(path: str, id: int) -> None: +s = stats.get(id) + +if s is not None: +if not os.path.exists(path): +os.makedirs(path) +p = os.path.join(path, "udf_%d.pstats" % id) Review Comment: Makes sense! I'll reuse `f"udf_{id}_memory.txt"` for memory profiles. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47037] ] Fix AliasAwareOutputExpression outputPartitioning [spark]
liorregev opened a new pull request, #45093: URL: https://github.com/apache/spark/pull/45093 AliasAwareOutputExpression does not detect that `select(F.struct($"my_field"))` retains partitioning in case the dataset was partitioning by `$"my_field"` before the select. This causes an additional shuffle to be added when using `joinWith` on datasets that were already partitioned accordingly. ### What changes were proposed in this pull request? AliasAwareOutputExpression should respect struct fields when returning `outputPartitioning` ### Why are the changes needed? Extra shuffles are bad and slow down my pipeline. Would like them gone please ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit tests that covers the scenario ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
HeartSaVioR closed pull request #45038: [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider URL: https://github.com/apache/spark/pull/45038 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46979][SS] Add support for specifying key and value encoder separately and also for each col family in RocksDB state store provider [spark]
HeartSaVioR commented on PR #45038: URL: https://github.com/apache/spark/pull/45038#issuecomment-1942570003 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-47036] Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory [spark]
sahnib commented on PR #45092: URL: https://github.com/apache/spark/pull/45092#issuecomment-1942560146 cc: @HeartSaVioR PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
MaxGekk commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1488560783 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -1410,6 +1422,13 @@ public boolean equals(final Object other) { } } + /** + * Collation-aware equality comparison of two UTF8String. + */ + public boolean semanticEquals(final UTF8String other, int collationId) { +return CollationFactory.fetchCollation(collationId).equalsFunction.apply(this, other); Review Comment: > Would it be ok to do benchmarks as a follow up PR? Let's do that independently from the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
dbatomic commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1488536286 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -1410,6 +1422,13 @@ public boolean equals(final Object other) { } } + /** + * Collation-aware equality comparison of two UTF8String. + */ + public boolean semanticEquals(final UTF8String other, int collationId) { +return CollationFactory.fetchCollation(collationId).equalsFunction.apply(this, other); Review Comment: Would it be ok to do benchmarks as a follow up PR? We definitely do need benchmark strategy for both regular string ops and collation special cases. The closest we have so far in this space is `CharVarcharBenchmark` which is not really what we need. IMO, this PR is getting a bit too large for my taste so I would prefer to split it into smaller parts. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SS} Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory [spark]
sahnib opened a new pull request, #45092: URL: https://github.com/apache/spark/pull/45092 … ### What changes were proposed in this pull request? This change cleans up any dangling files tracked as being previously uploaded if they were cleaned up from the filesystem. The cleaning can happen due to a compaction racing in parallel with commit, where compaction completes after commit and a older version is loaded on the same executor. ### Why are the changes needed? The changes are needed to prevent RocksDB versionId mismatch errors (which require users to clean the checkpoint directory and retry the query). A particular scenario where this can happen is provided below: 1. Version V1 is loaded on executor A, RocksDB State Store has 195.sst, 196.sst, 197.sst and 198.sst files. 2. State changes are made, which result in creation of a new table file 200.sst. 3. State store is committed as version V2. The SST file 200.sst (as 000200-8c80161a-bc23-4e3b-b175-cffe38e427c7.sst) is uploaded to DFS, and previous 4 files are reused. A new metadata file is created to track the exact SST files with unique IDs, and uploaded with RocksDB Manifest as part of V1.zip. 4. Rocks DB compaction is triggered at the same time. The compaction creates a new L1 file (201.sst), and deletes existing 5 SST files. 5. Spark Stage is retried. 6. Version V1 is reloaded on the same executor. The local files are inspected, and 201.sst is deleted. The 4 SST files in version V1 are downloaded again to local file system. 7. Any local files which are deleted (as part of version load) are also removed from local → DFS file upload tracking. **However, the files already deleted as a result of compaction are not removed from tracking. This is the bug which resulted in the failure.** 8. State store is committed as version V1. However, the local mapping of SST files to DFS file path still has 200.sst in its tracking, hence the SST file is not re-uploaded. A new metadata file is created to track the exact SST files with unique IDs, and uploaded with the new RocksDB Manifest as part of V2.zip. (The V2.zip file is overwritten here atomically) 9. A new executor tried to load version V2. However, the SST files in (1) are now incompatible with Manifest file in (6) resulting in the version Id mismatch failure. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test cases to cover the scenario where some files were deleted on the file system. The test case fails with the existing master with error `Mismatch in unique ID on table file 16`, and is successful with changes in this PR. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47035][SS][CONNECT] Protocol for Client-Side Listener [spark]
WweiL commented on PR #45091: URL: https://github.com/apache/spark/pull/45091#issuecomment-1942363588 @grundprinzip -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47035][SS][CONNECT] Protocol for Client-Side Listener [spark]
WweiL opened a new pull request, #45091: URL: https://github.com/apache/spark/pull/45091 ### What changes were proposed in this pull request? Currently, the StreamingQueryListener for Connect runs on the server side. From a customer point of view, the purpose of a listener is mainly to have a pushing mechanism that notifies them when queries start / end / make progress. Before, the server-side listener essentially loses this functionality, and we find out that internally there are needs for the client side listener. The new listener will be running on the client side, with the server continuously pushing the new listener events to the client, and the client will call corresponding callback functions for different listener event types. This is the first PR that defines the protocol of this new listener. ### Why are the changes needed? Add client side listener which makes more sense. ### Does this PR introduce _any_ user-facing change? Not for this one. ### How was this patch tested? No need for this one, new tests will be added later. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
dbatomic commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1488515575 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType + +class CollationSuite extends QueryTest with SharedSparkSession { + test("collate returns proper type") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation name is case insensitive") { +Seq("uCs_BasIc", "uCs_baSic_Lcase", "uNicOde", "UNICODE_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation expression returns name of collation") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer( +sql(s"select collation('aaa' collate '$collationName')"), Row(collationName.toUpperCase())) +} + } + + test("collate function syntax") { +assert(sql(s"select collate('aaa', 'ucs_basic')").schema(0).dataType == StringType(0)) +assert(sql(s"select collate('aaa', 'ucs_basic_lcase')").schema(0).dataType == StringType(1)) + } + + test("collate function syntax invalid args") { +Seq("'aaa','a','b'", "'aaa'").foreach(args => { Review Comment: Yes! Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
dbatomic commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1488515363 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType + +class CollationSuite extends QueryTest with SharedSparkSession { + test("collate returns proper type") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation name is case insensitive") { +Seq("uCs_BasIc", "uCs_baSic_Lcase", "uNicOde", "UNICODE_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation expression returns name of collation") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer( +sql(s"select collation('aaa' collate '$collationName')"), Row(collationName.toUpperCase())) +} + } + + test("collate function syntax") { +assert(sql(s"select collate('aaa', 'ucs_basic')").schema(0).dataType == StringType(0)) +assert(sql(s"select collate('aaa', 'ucs_basic_lcase')").schema(0).dataType == StringType(1)) + } + + test("collate function syntax invalid args") { +Seq("'aaa','a','b'", "'aaa'").foreach(args => { + checkError( +exception = intercept[AnalysisException] { + sql(s"select collate($args)") +}, +errorClass = "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", +sqlState = "42605", +parameters = Map( + "functionName" -> "`collate`", + "expectedNum" -> "2", + "actualNum" -> args.split(',').length.toString, + "docroot" -> "https://spark.apache.org/docs/latest;), +context = ExpectedContext( + fragment = s"collate($args)", + start = 7, + stop = 15 + args.length) + ) +}) + } + + test("collation expression returns default collation") { +checkAnswer(sql(s"select collation('aaa')"), Row("UCS_BASIC")) + } + + test("invalid collation name throws exception") { +checkError( + exception = intercept[SparkException] { sql("select 'aaa' collate 'UCS_BASIS'") }, Review Comment: Right! Thanks. Fixed + added a test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1488459138 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala: ## @@ -46,5 +46,5 @@ private[sql] trait ValueState[S] extends Serializable { def update(newState: S): Unit /** Remove this state. */ - def remove(): Unit + def clear(): Unit Review Comment: Yes, this was discussed with @anishshri-db and both ValueState/ListState `remove` is renamed as `clear`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47023][BUILD] Upgrade `aircompressor` to 1.26 [spark]
dongjoon-hyun commented on PR #45084: URL: https://github.com/apache/spark/pull/45084#issuecomment-1942113291 Since the RC1 vote fails, I backported this to branch-3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
jingz-db commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1488336417 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -184,6 +185,41 @@ class IncrementalExecution( } } + /** Review Comment: Thanks for the review Jungtaek! I also like the idea of adding a map. > So the check can be also done after executing physical planning rules, maybe at the end of state.apply() I tried to add the check after `WriteStatefulOperatorMetadataRule` but this will miss detecting for adding an operator after restart (because the additional operator is already written to metadata). So I keep the check before `WriteStatefulOperatorMetadataRule` and will omit the check if metadata is empty. It is also worth noting that if we do not perform the check before writing to metadata and fail the query, untruthful info will be written to state metadata. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` [spark]
MaxGekk closed pull request #45082: [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` URL: https://github.com/apache/spark/pull/45082 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` [spark]
MaxGekk commented on PR #45082: URL: https://github.com/apache/spark/pull/45082#issuecomment-1941167022 Merging to master. Thank you, @LuciferYang for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-46858][PYTHON][PS][BUILD] Upgrade Pandas to 2.2.0 [spark]
itholic commented on code in PR #44881: URL: https://github.com/apache/spark/pull/44881#discussion_r1487597019 ## python/pyspark/pandas/frame.py: ## @@ -10607,7 +10607,9 @@ def melt( name_like_string(name) if name is not None else "variable_{}".format(i) for i, name in enumerate(self._internal.column_label_names) ] -elif isinstance(var_name, str): +elif is_list_like(var_name): +raise ValueError(f"{var_name=} must be a scalar.") +else: Review Comment: Fixed from: https://github.com/pandas-dev/pandas/pull/55948 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
MaxGekk commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1487518940 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType + +class CollationSuite extends QueryTest with SharedSparkSession { + test("collate returns proper type") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation name is case insensitive") { +Seq("uCs_BasIc", "uCs_baSic_Lcase", "uNicOde", "UNICODE_ci").foreach { collationName => + checkAnswer(sql(s"select 'aaa' collate '$collationName'"), Row("aaa")) + val collationId = CollationFactory.collationNameToId(collationName) + assert(sql(s"select 'aaa' collate '$collationName'").schema(0).dataType +== StringType(collationId)) +} + } + + test("collation expression returns name of collation") { +Seq("ucs_basic", "ucs_basic_lcase", "unicode", "unicode_ci").foreach { collationName => + checkAnswer( +sql(s"select collation('aaa' collate '$collationName')"), Row(collationName.toUpperCase())) +} + } + + test("collate function syntax") { +assert(sql(s"select collate('aaa', 'ucs_basic')").schema(0).dataType == StringType(0)) +assert(sql(s"select collate('aaa', 'ucs_basic_lcase')").schema(0).dataType == StringType(1)) + } + + test("collate function syntax invalid args") { +Seq("'aaa','a','b'", "'aaa'").foreach(args => { + checkError( +exception = intercept[AnalysisException] { + sql(s"select collate($args)") +}, +errorClass = "WRONG_NUM_ARGS.WITHOUT_SUGGESTION", +sqlState = "42605", +parameters = Map( + "functionName" -> "`collate`", + "expectedNum" -> "2", + "actualNum" -> args.split(',').length.toString, + "docroot" -> "https://spark.apache.org/docs/latest;), +context = ExpectedContext( + fragment = s"collate($args)", + start = 7, + stop = 15 + args.length) + ) +}) + } + + test("collation expression returns default collation") { +checkAnswer(sql(s"select collation('aaa')"), Row("UCS_BASIC")) + } + + test("invalid collation name throws exception") { +checkError( + exception = intercept[SparkException] { sql("select 'aaa' collate 'UCS_BASIS'") }, Review Comment: Could you check `NULL` as the collation name: ```scala sql("select collate('aaa', CAST(NULL AS STRING))") ``` Users might get a NPE: ```java Caused by: java.lang.NullPointerException: Cannot invoke "Object.toString()" because the return value of "org.apache.spark.sql.catalyst.expressions.Expression.eval(org.apache.spark.sql.catalyst.InternalRow)" is null at org.apache.spark.sql.catalyst.expressions.CollateExpressionBuilder$.build(collationExpressions.scala:40) at org.apache.spark.sql.catalyst.expressions.CollateExpressionBuilder$.build(collationExpressions.scala:36) ``` ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to
Re: [PR] [SPARK-46832][SQL] Introducing Collate and Collation expressions [spark]
MaxGekk commented on code in PR #45064: URL: https://github.com/apache/spark/pull/45064#discussion_r1487489877 ## common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java: ## @@ -1410,6 +1422,13 @@ public boolean equals(final Object other) { } } + /** + * Collation-aware equality comparison of two UTF8String. + */ + public boolean semanticEquals(final UTF8String other, int collationId) { +return CollationFactory.fetchCollation(collationId).equalsFunction.apply(this, other); Review Comment: It would be nice to add some benchmarks for string ops. @dbatomic Could you re-check the existing benchmarks ``` find . -name "*Benchmark.scala" |wc -l 76 ``` Maybe there are some relevant benchmarks already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
HeartSaVioR commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1487446994 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ## @@ -67,6 +67,16 @@ trait ReadStateStore { def get(key: UnsafeRow, colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow + /** + * Provides an iterator containing all values for a particular key. The values are merged Review Comment: We can put the detail in the actual state store (provider) class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
HeartSaVioR commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1487255434 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala: ## @@ -78,7 +78,7 @@ class StatePartitionReader( StateStoreProvider.createAndInit( stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey, - useColumnFamilies = false, storeConf, hadoopConf.value) + useColumnFamilies = false, storeConf, hadoopConf.value, useMultipleValuesPerKey = false) Review Comment: Shall we file a JIRA ticket to support transformWithState with state data source - reader? From what I understand, we are deferring on supporting this. Please correct me if I'm missing. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.ListState + +/** + * Provides concrete implementation for list of values associated with a state variable + * used in the streaming transformWithState operator. + * + * @param store - reference to the StateStore instance to be used for storing state + * @param stateName - name of logical state partition + * @tparam S - data type of object that will be stored in the list + */ +class ListStateImpl[S](store: StateStore, + stateName: String, + keyExprEnc: ExpressionEncoder[Any]) + extends ListState[S] with Logging { + + /** Whether state exists or not. */ + override def exists(): Boolean = { + val encodedGroupingKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val stateValue = store.get(encodedGroupingKey, stateName) + stateValue != null + } + + /** Get the state value if it exists. If the state does not exist in state store, an +* empty iterator is returned. */ + override def get(): Iterator[S] = { + val encodedKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val unsafeRowValuesIterator = store.valuesIterator(encodedKey, stateName) + new Iterator[S] { + override def hasNext: Boolean = { + unsafeRowValuesIterator.hasNext + } + + override def next(): S = { + val valueUnsafeRow = unsafeRowValuesIterator.next() + StateTypesEncoderUtils.decodeValue(valueUnsafeRow) + } + } + } + + /** Get the list value as an option if it exists and None otherwise. */ + override def getOption(): Option[Iterator[S]] = { + Option(get()) + } + + /** Update the value of the list. */ + override def put(newState: Array[S]): Unit = { + validateNewState(newState) + + if (newState.isEmpty) { + this.clear() Review Comment: I feel like there might be some use case to distinguish empty list vs no value. Is this coupled with technical issue (e.g. we don't allow empty list as value at all), or just a matter of UX? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ## @@ -67,6 +67,16 @@ trait ReadStateStore { def get(key: UnsafeRow, colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow + /** + * Provides an iterator containing all values for a particular key. The values are merged Review Comment: The method doc seems to be too tied with the implementation, specifically for RocksDB state store provider. Shall we simply mention the method contract without detail here? Please have a look at other existing methods. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the
Re: [PR] [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException` [spark]
MaxGekk commented on PR #45082: URL: https://github.com/apache/spark/pull/45082#issuecomment-1940685375 @panbingkun @srielau @LuciferYang @beliefer @cloud-fan Could you review this PR, please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org