Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]
MaxGekk closed pull request #45098: [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` URL: https://github.com/apache/spark/pull/45098 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]
MaxGekk commented on PR #45098: URL: https://github.com/apache/spark/pull/45098#issuecomment-1945501972 Merging to master. Thank you, @srielau for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47054][PYTHON][TESTS] Remove pinned version of torch for Python 3.12 support [spark]
HyukjinKwon opened a new pull request, #45113: URL: https://github.com/apache/spark/pull/45113 ### What changes were proposed in this pull request? This PR unpins the version for torch in our CI. ### Why are the changes needed? Testing latest version. This also blocks SPARK-46078. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Manually tested via `./dev/lint-python`. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47015][Collation] Disable partitioning on collated columns [spark]
cloud-fan commented on PR #45104: URL: https://github.com/apache/spark/pull/45104#issuecomment-1945425646 how about bucket columns? We generate the bucket id from the string value and assume all the semantically-same string values should generate the same bucket id, which isn't true for string with collation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47009][SQL] Enable create table support for collation [spark]
cloud-fan commented on PR #45105: URL: https://github.com/apache/spark/pull/45105#issuecomment-1945424385 We should put more high-level information: what's the corresponding parquet type for string with collation? and how do we fix the parquet max/min column stats? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47009][SQL] Enable create table support for collation [spark]
cloud-fan commented on code in PR #45105: URL: https://github.com/apache/spark/pull/45105#discussion_r1490442977 ## sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala: ## @@ -117,6 +117,7 @@ object DataType { private val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r private val CHAR_TYPE = """char\(\s*(\d+)\s*\)""".r private val VARCHAR_TYPE = """varchar\(\s*(\d+)\s*\)""".r + private val COLLATED_STRING_TYPE = """string\(\s*([\w_]+)\s*\)""".r Review Comment: since it's a DDL string, shall we follow the actual SQL syntax and use `STRING COLLATE name`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47053][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR closed pull request #45111: [SPARK-47053][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script URL: https://github.com/apache/spark/pull/45111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47053][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR commented on PR #45111: URL: https://github.com/apache/spark/pull/45111#issuecomment-1945420722 This was merged via [9b4778f](https://github.com/apache/spark/commit/9b4778fc1dc7047635c9ec19c187d4e75d182590) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] Skip MemoryProfilerParityTests when codecov enabled [spark]
HyukjinKwon commented on PR #45112: URL: https://github.com/apache/spark/pull/45112#issuecomment-1945413337 Thx! I manually ran linters, and other tests won't be verified here. therefore just merging.. Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] Skip MemoryProfilerParityTests when codecov enabled [spark]
HyukjinKwon closed pull request #45112: [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] Skip MemoryProfilerParityTests when codecov enabled URL: https://github.com/apache/spark/pull/45112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR commented on PR #45111: URL: https://github.com/apache/spark/pull/45111#issuecomment-1945412391 Let me merge this to continue release step. 3.5.1 RC will start with 3.5.1 RC2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46687][TESTS][PYTHON] Skip MemoryProfilerParityTests when codecov enabled [spark]
HyukjinKwon commented on PR #45112: URL: https://github.com/apache/spark/pull/45112#issuecomment-1945409614 cc @xinrong-meng @ueshin -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46687][TESTS][PYTHON] Skip MemoryProfilerParityTests when codecov enabled [spark]
HyukjinKwon opened a new pull request, #45112: URL: https://github.com/apache/spark/pull/45112 ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/44775 that skips the tests with codecov on. It fails now (https://github.com/apache/spark/actions/runs/7709423681/job/21010676103) and the coverage report is broken. ### Why are the changes needed? To recover the test coverage report. ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No, -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47009][Collation] Enable create table support for collation [spark]
cloud-fan commented on code in PR #45105: URL: https://github.com/apache/spark/pull/45105#discussion_r1490415076 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala: ## @@ -58,8 +59,8 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] { * Resolve/create a primitive type. */ override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType = withOrigin(ctx) { -val typeName = ctx.`type`.start.getType -(typeName, ctx.INTEGER_VALUE().asScala.toList) match { +val typeCtx = ctx.`type` +(typeCtx.start.getType, ctx.INTEGER_VALUE().asScala.toList) match { Review Comment: cc @hvanhovell @HyukjinKwon , since we put the SQL parser in the Spark Connect Scala client component, adding new data types means a client version upgrade. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47009][Collation] Enable create table support for collation [spark]
cloud-fan commented on code in PR #45105: URL: https://github.com/apache/spark/pull/45105#discussion_r1490413649 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -1095,6 +1095,10 @@ colPosition : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier ; +collateExpression Review Comment: maybe `collateClause`? This is not really an expression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47044] Add executed query for JDBC external datasources to explain output [spark]
dtenedor commented on code in PR #45102: URL: https://github.com/apache/spark/pull/45102#discussion_r1490403653 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala: ## @@ -81,17 +81,20 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions) { /** * Constructs the WHERE clause that following dialect's SQL syntax. */ - def withPredicates(predicates: Array[Predicate], part: JDBCPartition): JdbcSQLQueryBuilder = { + def withPredicates(predicates: Array[Predicate], Review Comment: please start these arguments on the next line, indented by +4 spaces each. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46820][PYTHON] Fix error message regression by restoring `new_msg` [spark]
HyukjinKwon commented on code in PR #44859: URL: https://github.com/apache/spark/pull/44859#discussion_r1490402927 ## python/pyspark/sql/types.py: ## @@ -2197,8 +2197,10 @@ def verify_nullability(obj: Any) -> bool: return True else: raise PySparkValueError( -error_class="CANNOT_BE_NONE", -message_parameters={"arg_name": "obj"}, +error_class="FIELD_NOT_NULLABLE", +message_parameters={ +"field_name": name if name is not None else "", Review Comment: Seems like the error message would look weird if this is an empty string. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on PR #45107: URL: https://github.com/apache/spark/pull/45107#issuecomment-1945383372 Merged to master for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun closed pull request #45107: [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` URL: https://github.com/apache/spark/pull/45107 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490390398 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: From my side, we don't use YARN and Spark Connect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490389222 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: Thanks! I hoped so too. For now, `SparkSessionE2ESuite` flakiness and `YarnClusterSuite` seems to be abandoned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
HyukjinKwon commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490374178 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: hmmm.. I think we should actually better fix the flakiness .. I am fine with this as a temporary change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46906][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR opened a new pull request, #45111: URL: https://github.com/apache/spark/pull/45111 ### What changes were proposed in this pull request? This PR proposes to bump python libraries (pandas to 2.0.3, pyarrow to 4.0.0) in Docker image for release script. ### Why are the changes needed? Without this change, release script (do-release-docker.sh) fails on docs phase. Changing this fixes the release process against branch-3.5. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed with dry-run of release script against branch-3.5. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR commented on PR #45110: URL: https://github.com/apache/spark/pull/45110#issuecomment-1945364343 docs phase against master branch + this PR failed "before" python docs build: ``` Your bundle is locked to sass-embedded (1.69.7) from rubygems repository https://rubygems.org/ or installed locally, but that version can no longer be found in that source. That means the author of sass-embedded (1.69.7) has removed it. You'll need to update your bundle to a version other than sass-embedded (1.69.7) that hasn't been removed in order to install. ``` Rebasing this PR to master branch doesn't seem to work as expected. I'll submit a PR for branch-3.5, as cherry-pick has conflict anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
HeartSaVioR commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1490354597 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider value } +override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { + verify(key != null, "Key cannot be null") + verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a encoder " + + "that supports multiple values for a single key.") + val valueIterator = encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName)) + + if (!isValidated && valueIterator.nonEmpty) { Review Comment: Discussed offline. There had been a couple of cases which inspection of state row in underneath were helpful on debugging. Maybe we could remove this once we improve state data source reader to produce state key-value as binary, but before that let's keep it as it is for existing codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
HeartSaVioR commented on PR #44927: URL: https://github.com/apache/spark/pull/44927#issuecomment-1945339250 @jingz-db Mind retriggering GA? You can either manually do this in your fork or simply push an empty commit to do this automatically. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR commented on PR #45110: URL: https://github.com/apache/spark/pull/45110#issuecomment-1945338259 I'm now running release script with dry-run against master branch. Will update PR description once it works for master branch as well. Otherwise I'll rebase this PR against branch-3.5. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]
HeartSaVioR opened a new pull request, #45110: URL: https://github.com/apache/spark/pull/45110 ### What changes were proposed in this pull request? This PR proposes to bump python libraries (pandas to 2.0.3, pyarrow to 4.0.0) in Docker image for release script. ### Why are the changes needed? Without this change, release script (do-release-docker.sh) fails on docs phase. Changing this fixes the release process against branch-3.5. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Confirmed with dry-run of release script against branch-3.5. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]
dongjoon-hyun commented on PR #45090: URL: https://github.com/apache/spark/pull/45090#issuecomment-1945326854 If then, we need to revert this. Could you confirm that the above works for you, @grundprinzip ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]
dongjoon-hyun commented on PR #45090: URL: https://github.com/apache/spark/pull/45090#issuecomment-1945326307 Oh, right. Does it work in the same with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]
pan3793 commented on PR #45090: URL: https://github.com/apache/spark/pull/45090#issuecomment-1945309543 > ... that leaves it running in the foreground ... could it be achieved by the following command? ``` SPARK_NO_DAEMONIZE=1 ./sbin/start-connect-server.sh ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490256089 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) Review Comment: We only use data source schema when user doesn't specify schema. https://github.com/apache/spark/blob/736d8ab3f00e7c5ba1b01c22f6398b636b8492ea/python/pyspark/sql/worker/create_data_source.py#L144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490236219 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: Now, the `streaming, ...` pipeline run time is down to 62m. So, the total increased overhead is around 20min. ![Screenshot 2024-02-14 at 16 47 28](https://github.com/apache/spark/assets/9700541/18ba8ad1-3295-4aff-b853-bbc30f3c830a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490233363 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: Ya, I also was reluctant due to that building time. Specifically, It will take `34 minutes` in total run time. The second goal is to reduce the re-trigger time. Previously, YARN/Connect/Kafka modules build a bad synergy because their failure rates are multiplied. And, I needed to re-trigger and wait over 70 ~ 90minutes for this pipeline. ![Screenshot 2024-02-14 at 16 39 19](https://github.com/apache/spark/assets/9700541/e1afe7d3-8c0c-4ac0-93d0-34c810eb6366) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490233363 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: Ya, I also was reluctant. It will take `34 minutes` in total run time. The second goal is to reduce the re-trigger time. Previously, YARN/Connect/Kafka modules build a bad synergy because their failure rates are multiplied. And, I needed to re-trigger and wait over 70 ~ 90minutes for this pipeline. ![Screenshot 2024-02-14 at 16 39 19](https://github.com/apache/spark/assets/9700541/e1afe7d3-8c0c-4ac0-93d0-34c810eb6366) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]
HyukjinKwon commented on PR #43210: URL: https://github.com/apache/spark/pull/43210#issuecomment-1945142540 Reverted at https://github.com/apache/spark/commit/ea6b25767fb86732c108c759fd5393caee22f129 in branch-3.5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [Don't merge & review] verify sbt on master [spark]
github-actions[bot] closed pull request #43079: [Don't merge & review] verify sbt on master URL: https://github.com/apache/spark/pull/43079 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45669][CORE] Ensure the continuity of rolling log index [spark]
github-actions[bot] commented on PR #43534: URL: https://github.com/apache/spark/pull/43534#issuecomment-1945132826 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]
HyukjinKwon commented on PR #43210: URL: https://github.com/apache/spark/pull/43210#issuecomment-1945132337 Let's revert it in branch-3.5, and fix it again. It's not critical bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
HyukjinKwon commented on code in PR #45107: URL: https://github.com/apache/spark/pull/45107#discussion_r1490216663 ## .github/workflows/build_and_test.yml: ## @@ -147,8 +147,9 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl, -connect, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf + - >- +yarn, connect Review Comment: Hmmm ... Does separating the build help the stabilization? I am not against this change but splitting this will cause 30mins time for building. Although it runs in parallel, individual fork has some resource limitation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490215001 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"0": "2"} + |def latestOffset(self): + |return {"0": "2"} Review Comment: Nice suggestion, changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490214832 ## sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkException +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, shouldTestPandasUDFs} +import org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, PythonMicroBatchStream, PythonStreamingSourceOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase { + + protected def simpleDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class SimpleDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |return {"0": "2"} + |def latestOffset(self): + |return {"0": "2"} + |def partitions(self, start: dict, end: dict): + |return [InputPartition(i) for i in range(int(start["0"]))] + |def commit(self, end: dict): + |1 + 2 + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + protected def errorDataStreamReaderScript: String = +""" + |from pyspark.sql.datasource import DataSourceStreamReader, InputPartition + | + |class ErrorDataStreamReader(DataSourceStreamReader): + |def initialOffset(self): + |raise Exception("error reading initial offset") + |def latestOffset(self): + |raise Exception("error reading latest offset") + |def partitions(self, start: dict, end: dict): + |raise Exception("error planning partitions") + |def commit(self, end: dict): + |raise Exception("error committing offset") + |def read(self, partition): + |yield (0, partition.value) + |yield (1, partition.value) + |yield (2, partition.value) + |""".stripMargin + + private val errorDataSourceName = "ErrorDataSource" + + test("simple data stream source") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |$simpleDataStreamReaderScript + | + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |return SimpleDataStreamReader() + |""".stripMargin +val inputSchema = StructType.fromDDL("input BINARY") + +val dataSource = createUserDefinedPythonDataSource(dataSourceName, dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSource) +val pythonDs = new PythonDataSourceV2 +pythonDs.setShortName("SimpleDataSource") +val stream = new PythonMicroBatchStream( + pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty()) + +val initialOffset = stream.initialOffset() +assert(initialOffset.json == "{\"0\": \"2\"}") +for (_ <- 1 to 50) { + val offset = stream.latestOffset() + assert(offset.json == "{\"0\": \"2\"}") + assert(stream.planInputPartitions(offset, offset).size == 2) + stream.commit(offset) +} +stream.stop() + } + + test("Error creating stream reader") { +assume(shouldTestPandasUDFs) +val dataSourceScript = + s""" + |from pyspark.sql.datasource import DataSource + |class $dataSourceName(DataSource): + |def streamReader(self, schema): + |raise Exception("error creating stream reader") + |""".stripMargin +val dataSource = createUserDefinedPythonDataSource( + name = dataSourceName, pythonScript = dataSourceScript) +spark.dataSource.registerPython(dataSourceName, dataSourc
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490214280 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala: ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.api.python.{PythonFunction, PythonWorker, PythonWorkerFactory, PythonWorkerUtils, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE +import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, PYTHON_USE_DAEMON} +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +object PythonStreamingSourceRunner { + val initialOffsetFuncId = 884 + val latestOffsetFuncId = 885 + val partitionsFuncId = 886 + val commitFuncId = 887 +} + +/** + * This class is a proxy to invoke methods in Python DataSourceStreamReader from JVM. + * A runner spawns a python worker process. In the main function, set up communication + * between JVM and python process through socket and create a DataSourceStreamReader instance. + * In an infinite loop, the python worker process poll information(function name and parameters) + * from the socket, invoke the corresponding method of StreamReader and send return value to JVM. + */ +class PythonStreamingSourceRunner( +func: PythonFunction, +outputSchema: StructType) extends Logging { + val workerModule = "pyspark.sql.streaming.python_streaming_source_runner" + + private val conf = SparkEnv.get.conf + protected val bufferSize: Int = conf.get(BUFFER_SIZE) + protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT) + + private val envVars: java.util.Map[String, String] = func.envVars + private val pythonExec: String = func.pythonExec + private var pythonWorker: Option[PythonWorker] = None + private var pythonWorkerFactory: Option[PythonWorkerFactory] = None + protected val pythonVer: String = func.pythonVer + + private var dataOut: DataOutputStream = null + private var dataIn: DataInputStream = null + + import PythonStreamingSourceRunner._ + + /** + * Initializes the Python worker for running the streaming source. + */ + def init(): Unit = { +logInfo(s"Initializing Python runner pythonExec: $pythonExec") +val env = SparkEnv.get + +val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") +envVars.put("SPARK_LOCAL_DIRS", localdir) + +envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) +envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) + +val prevConf = conf.get(PYTHON_USE_DAEMON) +conf.set(PYTHON_USE_DAEMON, false) +try { + val workerFactory = +new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap) + val (worker: PythonWorker, _) = workerFactory.createSimpleWorker(blockingMode = true) + pythonWorker = Some(worker) + pythonWorkerFactory = Some(workerFactory) +} finally { + conf.set(PYTHON_USE_DAEMON, prevConf) +} + +val stream = new BufferedOutputStream( + pythonWorker.get.channel.socket().getOutputStream, bufferSize) +dataOut = new DataOutputStream(stream) + +PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) + +val pythonIncludes = func.pythonIncludes.asScala.toSet +PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, dataOut) + +// Send the user function to python process +PythonWorkerUtils.writePythonFunction(func, dataOut) + +// Send output schema +PythonWorkerUtils.writeUTF(outputSchema.json, dataOut) + +// Send configurations +dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch) +dataOut.flush() + +dataIn = new DataInputStream( + new BufferedInputStream(pytho
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490213270 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) +schema = _parse_datatype_json_string(schema_json) +if not isinstance(schema, StructType): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "an output schema of type 'StructType'", +"actual": f"'{type(schema).__name__}'", +}, +) + +# Receive the configuration values. Review Comment: Removed. It is no longer necessary because the pickled read function will be created from another python worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spa
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490211705 ## python/pyspark/sql/streaming/python_streaming_source_runner.py: ## @@ -0,0 +1,178 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import functools +import json +from itertools import islice +from typing import IO, List, Iterator, Iterable + +from pyspark.accumulators import _accumulatorRegistry +from pyspark.errors import PySparkAssertionError, PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.serializers import ( +read_int, +write_int, +write_with_length, +SpecialLengths, +) +from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, LocalDataToArrowConversion +from pyspark.sql.datasource import DataSource, InputPartition +from pyspark.sql.pandas.types import to_arrow_schema +from pyspark.sql.types import ( +_parse_datatype_json_string, +BinaryType, +StructType, +) +from pyspark.util import handle_worker_exception +from pyspark.worker_util import ( +check_python_version, +read_command, +pickleSer, +send_accumulator_updates, +setup_broadcasts, +setup_memory_limits, +setup_spark_files, +utf8_deserializer, +) + +initial_offset_func_id = 884 +latest_offset_func_id = 885 +partitions_func_id = 886 +commit_func_id = 887 + + +def initial_offset_func(reader, outfile): +offset = reader.initialOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def latest_offset_func(reader, outfile): +offset = reader.latestOffset() +write_with_length(json.dumps(offset).encode("utf-8"), outfile) + + +def partitions_func(reader, infile, outfile): +start_offset = json.loads(utf8_deserializer.loads(infile)) +end_offset = json.loads(utf8_deserializer.loads(infile)) +partitions = reader.partitions(start_offset, end_offset) +# Return the serialized partition values. +write_int(len(partitions), outfile) +for partition in partitions: +pickleSer._write_with_length(partition, outfile) + + +def commit_func(reader, infile, outfile): +end_offset = json.loads(utf8_deserializer.loads(infile)) +reader.commit(end_offset) +write_int(0, outfile) + + +def main(infile: IO, outfile: IO) -> None: +try: +check_python_version(infile) +setup_spark_files(infile) + +memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", "-1")) +setup_memory_limits(memory_limit_mb) + +_accumulatorRegistry.clear() + +# Receive the data source instance. +data_source = read_command(pickleSer, infile) + +if not isinstance(data_source, DataSource): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "a Python data source instance of type 'DataSource'", +"actual": f"'{type(data_source).__name__}'", +}, +) + +# Receive the data source output schema. +schema_json = utf8_deserializer.loads(infile) +schema = _parse_datatype_json_string(schema_json) +if not isinstance(schema, StructType): +raise PySparkAssertionError( +error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH", +message_parameters={ +"expected": "an output schema of type 'StructType'", +"actual": f"'{type(schema).__name__}'", +}, +) + +# Receive the configuration values. +max_arrow_batch_size = read_int(infile) +assert max_arrow_batch_size > 0, ( +"The maximum arrow batch size should be greater than 0, but got " +f"'{max_arrow_batch_size}'" +) + +# Instantiate data source reader. +try: +reader = data_source.streamReader(schema=schema) +# Initialization succeed. +write_int(0, outfile) +outfile.flush() + +# handle method call from socket +while True: +func_id = read_int(infile) +
[PR] [SPARK-47052][WIP] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]
jerrypeng opened a new pull request, #45109: URL: https://github.com/apache/spark/pull/45109 ### What changes were proposed in this pull request? To improve code clarity and maintainability, I propose that we move all the variables that track mutable state and metrics for streaming query into a separate class. With this refactor, it would be easy to track and find all the mutable state a microbatch can have. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun commented on PR #45107: URL: https://github.com/apache/spark/pull/45107#issuecomment-1945067506 Could you review this test pipeline adjustment PR, @HyukjinKwon ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [wip] value state ttl poc [spark]
ericm-db opened a new pull request, #45108: URL: https://github.com/apache/spark/pull/45108 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]
dongjoon-hyun opened a new pull request, #45107: URL: https://github.com/apache/spark/pull/45107 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]
dongjoon-hyun closed pull request #45106: [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies URL: https://github.com/apache/spark/pull/45106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]
dongjoon-hyun commented on PR #45106: URL: https://github.com/apache/spark/pull/45106#issuecomment-1944840049 Thank you, @sunchao . Yes, it's irrelevant to this. Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490134034 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: UPDATE: we are discussing offline now and it could take time - this is tied to the interface and crazily uneasy to change after it is shipped. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
jingz-db commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1490116430 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + test("Changing operator - " + +"replace, add, remove operators will trigger error with debug message") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + + testStream(stream.dropDuplicates())( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 1), +ProcessAllAvailable(), +StopStream + ) + + def checkOpChangeError(OpsInMetadataSeq: Seq[String], Review Comment: Got it! Thanks Jungtaek! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490111896 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: That essentially means we need to add the deserializeOffset() interface to the StreamReader for deserialization. Or is there any alternative? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value (flipping the coin we need to still restrict the types as we have to convert this to json), but probably less headache to let data source implementation to take care of themselves. ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value (flipping the coin we need to still restrict the available types as we have to convert this to json), but probably less headache to let data source implementation to take care of themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Maybe yes. I'm OK with dict with clearly explaining which type they can use as value, but probably less headache to let data source implementation to take care of themselves. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490107780 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: So data source implementation provides serde between its offset model <-> json, and python worker will handle it with serde being provided. For python worker <-> JVM, json is used. Does this make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490106637 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: Does that mean we also need to add deserializeOffset() to the python interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
HeartSaVioR commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1490104863 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: We expect data source to know about its offset model - we pass a json format of offset to specific data source, which is expected to be deserialized successfully to its offset model. It will come to the runtime error if the json format of offset and offset model aren't compatible, which I think Scala side is doing the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
HeartSaVioR commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1490093460 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +215,79 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + Seq("Replace", "Add", "Remove").foreach { operation => +test(s"$operation stateful operator will trigger error with guidance") { + withTempDir { checkpointDir => +val inputData = MemoryStream[Int] +val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + +testStream(stream.dropDuplicates())( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 1), + ProcessAllAvailable(), + StopStream) + +val (opsInMetadataSeq, opsInCurBatchSeq, restartStream) = operation match { + case "Add" => +( + Map(0L -> "dedupe"), + Map(0L -> "stateStoreSave", 1L -> "dedupe"), + stream.dropDuplicates().groupBy("value").count()) + case "Replace" => +(Map(0L -> "dedupe"), Map(0L -> "stateStoreSave"), stream.groupBy("value").count()) + case "Remove" => +(Map(0L -> "dedupe"), Map.empty[Long, String], stream) +} + +testStream(restartStream, Update)( + StartStream(checkpointLocation = checkpointDir.toString), + AddData(inputData, 3), + ExpectFailure[SparkRuntimeException] { t => { Review Comment: super nit: another {} is unnecessary after `{ t =>`. multiple lines are allowed after `=>`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]
dongjoon-hyun commented on PR #45106: URL: https://github.com/apache/spark/pull/45106#issuecomment-1944653099 Could you review this, please, @sunchao ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
HeartSaVioR commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1490087695 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + test("Changing operator - " + +"replace, add, remove operators will trigger error with debug message") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + + testStream(stream.dropDuplicates())( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 1), +ProcessAllAvailable(), +StopStream + ) + + def checkOpChangeError(OpsInMetadataSeq: Seq[String], Review Comment: Maybe community has to run the formatter at once for the whole codebase. I'm not sure scalafmt can deal with the whole styles though. It is still good to familiarize Scala style guide for Databricks; it doesn't only contain styles automation can handle. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
HeartSaVioR commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1490075730 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -75,6 +76,61 @@ private[sql] class RocksDBStateStoreProvider value } +/** + * Provides an iterator containing all values of a non-null key. + * + * Inside RocksDB, the values are merged together and stored as a byte Array. + * This operation relies on state store value encoder to be able to split the + * single array into multiple values. + * + * Also see [[MultiValuedStateEncoder]] which supports encoding/decoding multiple + * values per key. + */ +override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { + verify(key != null, "Key cannot be null") + + val kvEncoder = keyValueEncoderMap.get(colFamilyName) + val valueEncoder = kvEncoder._2 + val keyEncoder = kvEncoder._1 + + verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator requires a encoder " + + "that supports multiple values for a single key.") + val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName) + val valueIterator = valueEncoder.decodeValues(encodedKey) + + if (valueIterator.nonEmpty) { +new Iterator[UnsafeRow] { + override def hasNext: Boolean = { +valueIterator.hasNext + } + + override def next(): UnsafeRow = { +val value = valueIterator.next() +if (value != null) { + StateStoreProvider.validateStateRowFormat( Review Comment: This triggers overhead so we wouldn't like to do this for every value. Either we have to sample (that's why we only do for the first value) or not doing this at all. It's probably OK to do latter as get() also doesn't do this for the case where there are multiple CFs. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala: ## @@ -77,9 +50,7 @@ class ValueStateImpl[S]( override def getOption(): Option[S] = { val retRow = getImpl() Review Comment: I know this is beyond the scope of PR, but while we are here, I don't see any difference with `Option(get())`. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ## @@ -131,6 +141,15 @@ trait StateStore extends ReadStateStore { def remove(key: UnsafeRow, colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit + /** + * Merges the provided value with existing values of a non-null key. Review Comment: Maybe we want to explicitly document the contract on the behavior when performing merge against non-existing key, since all implementations should provide the same. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ## @@ -67,6 +67,15 @@ trait ReadStateStore { def get(key: UnsafeRow, colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow + /** + * Provides an iterator containing all values of a non-null key. Review Comment: Let's make a contract about expected return value, especially about clarification of value for non-existing key. Multiple implementations should follow the contract so this needs to be clearly explained. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA, VALUE_ROW_SCHEMA} +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.ListState + +/** + * Provides concrete implementation for list of values associated with a state variable + * used in the streaming transformWithState operator. + * + * @param store - reference to the St
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
jingz-db commented on PR #44927: URL: https://github.com/apache/spark/pull/44927#issuecomment-1944494272 Thanks Jungtaek for your thorough code review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
sahnib commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1489986029 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.ListState + +/** + * Provides concrete implementation for list of values associated with a state variable + * used in the streaming transformWithState operator. + * + * @param store - reference to the StateStore instance to be used for storing state + * @param stateName - name of logical state partition + * @tparam S - data type of object that will be stored in the list + */ +class ListStateImpl[S](store: StateStore, + stateName: String, + keyExprEnc: ExpressionEncoder[Any]) + extends ListState[S] with Logging { + + /** Whether state exists or not. */ + override def exists(): Boolean = { + val encodedGroupingKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val stateValue = store.get(encodedGroupingKey, stateName) + stateValue != null + } + + /** Get the state value if it exists. If the state does not exist in state store, an +* empty iterator is returned. */ + override def get(): Iterator[S] = { + val encodedKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val unsafeRowValuesIterator = store.valuesIterator(encodedKey, stateName) + new Iterator[S] { + override def hasNext: Boolean = { + unsafeRowValuesIterator.hasNext + } + + override def next(): S = { + val valueUnsafeRow = unsafeRowValuesIterator.next() + StateTypesEncoderUtils.decodeValue(valueUnsafeRow) + } + } + } + + /** Get the list value as an option if it exists and None otherwise. */ + override def getOption(): Option[Iterator[S]] = { + Option(get()) + } + + /** Update the value of the list. */ + override def put(newState: Array[S]): Unit = { + validateNewState(newState) + + if (newState.isEmpty) { + this.clear() Review Comment: Added a validation for put/appendList to ensure empty lists cannot be passed. Passing an empty list throws a user Error. I have also removed `getOption` as discussed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
jingz-db commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1489982761 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + test("Changing operator - " + +"replace, add, remove operators will trigger error with debug message") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + + testStream(stream.dropDuplicates())( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 1), +ProcessAllAvailable(), +StopStream + ) + + def checkOpChangeError(OpsInMetadataSeq: Seq[String], Review Comment: Do we have any automation tool for checking this other than `./dev/scalafmt`? This command is listed on the [spark developer tool wiki](https://spark.apache.org/developer-tools.html#:~:text=the%20style%20guide.-,Formatting%20code,-To%20format%20Scala), and is actually quite messy - it will touch all existing files other than only formatting my code change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]
jingz-db commented on code in PR #44927: URL: https://github.com/apache/spark/pull/44927#discussion_r1489977792 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala: ## @@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601", Map("optionName" -> StateSourceOptions.PATH)) } + + test("Operator metadata path non-existence should not fail query") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val aggregated = +inputData.toDF() + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 1)), +StopStream + ) + + // Delete operator metadata path + val metadataPath = new Path(checkpointDir.toString, s"state/0/_metadata/metadata") + val fm = CheckpointFileManager.create(new Path(checkpointDir.getCanonicalPath), hadoopConf) + fm.delete(metadataPath) + + // Restart the query + testStream(aggregated, Complete)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +CheckLastBatch((3, 2)), +StopStream + ) +} + } + + test("Changing operator - " + +"replace, add, remove operators will trigger error with debug message") { +withTempDir { checkpointDir => + val inputData = MemoryStream[Int] + val stream = inputData.toDF().withColumn("eventTime", timestamp_seconds($"value")) + + testStream(stream.dropDuplicates())( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 1), +ProcessAllAvailable(), +StopStream + ) + + def checkOpChangeError(OpsInMetadataSeq: Seq[String], + OpsInCurBatchSeq: Seq[String], + ex: Throwable): Unit = { +checkError(ex.asInstanceOf[SparkRuntimeException], + "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03", + Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "), +"OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", ")) +) + } + + // replace dropDuplicates with dropDuplicatesWithinWatermark + testStream(stream.withWatermark("eventTime", "10 seconds") +.dropDuplicatesWithinWatermark(), Append)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 2), +ExpectFailure[SparkRuntimeException] { + (t: Throwable) => { +checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t) + } +} + ) + + // replace operator + testStream(stream.groupBy("value").count(), Update)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +ExpectFailure[SparkRuntimeException] { + (t: Throwable) => { +checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t) + } +} + ) + + // add operator + testStream(stream.dropDuplicates() +.groupBy("value").count(), Update)( +StartStream(checkpointLocation = checkpointDir.toString), +AddData(inputData, 3), +ExpectFailure[SparkRuntimeException] { + (t: Throwable) => { +checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"), t) + } +} + ) + + // remove operator Review Comment: > Do we disallow stateful query to be stateless? We don't allow even before adding the operator check. Streaming will throw error with message as "state path not found". > E.g. could you simply test the removal of stateful operator with checkpointDir rather than spinning up another checkpoint? Done. Restarting a stateless query from a stateful query will now trigger error with message as: ```bash [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional comma
Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]
MaxGekk commented on code in PR #45098: URL: https://github.com/apache/spark/pull/45098#discussion_r1489927146 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -7767,6 +7767,76 @@ "Single backslash is prohibited. It has special meaning as beginning of an escape sequence. To get the backslash character, pass a string with two backslashes as the delimiter." ] }, + "_LEGACY_ERROR_TEMP_3249" : { +"message" : [ + "Failed to convert value (class of }) with the type of to JSON." +] + }, + "_LEGACY_ERROR_TEMP_3250" : { +"message" : [ + "Failed to convert the JSON string '' to a field." +] + }, + "_LEGACY_ERROR_TEMP_3251" : { +"message" : [ + "Failed to convert the JSON string '' to a data type." +] + }, + "_LEGACY_ERROR_TEMP_3252" : { +"message" : [ + " does not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3253" : { +"message" : [ + " do(es) not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3254" : { +"message" : [ + " does not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3255" : { +"message" : [ + "Error parsing '' to interval, " +] + }, + "_LEGACY_ERROR_TEMP_3256" : { +"message" : [ + "Unrecognized datetime pattern: " +] + }, + "_LEGACY_ERROR_TEMP_3257" : { +"message" : [ + "All week-based patterns are unsupported since Spark 3.0, detected: , Please use the SQL function EXTRACT instead" Review Comment: I preserved punctuation of the original error intentionally: https://github.com/apache/spark/pull/45098/files#diff-1ada8897c412e27c3f73c8f5449f62f1fdc805b979cc7cfa35fcf8ad031529bbL313 to don't break existing tests. We should improve errors while assigning proper error class names, and writing tests for the errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]
xinrong-meng commented on code in PR #45098: URL: https://github.com/apache/spark/pull/45098#discussion_r1489921713 ## common/utils/src/main/resources/error/error-classes.json: ## @@ -7767,6 +7767,76 @@ "Single backslash is prohibited. It has special meaning as beginning of an escape sequence. To get the backslash character, pass a string with two backslashes as the delimiter." ] }, + "_LEGACY_ERROR_TEMP_3249" : { +"message" : [ + "Failed to convert value (class of }) with the type of to JSON." +] + }, + "_LEGACY_ERROR_TEMP_3250" : { +"message" : [ + "Failed to convert the JSON string '' to a field." +] + }, + "_LEGACY_ERROR_TEMP_3251" : { +"message" : [ + "Failed to convert the JSON string '' to a data type." +] + }, + "_LEGACY_ERROR_TEMP_3252" : { +"message" : [ + " does not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3253" : { +"message" : [ + " do(es) not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3254" : { +"message" : [ + " does not exist. Available: " +] + }, + "_LEGACY_ERROR_TEMP_3255" : { +"message" : [ + "Error parsing '' to interval, " +] + }, + "_LEGACY_ERROR_TEMP_3256" : { +"message" : [ + "Unrecognized datetime pattern: " +] + }, + "_LEGACY_ERROR_TEMP_3257" : { +"message" : [ + "All week-based patterns are unsupported since Spark 3.0, detected: , Please use the SQL function EXTRACT instead" Review Comment: nit: punctuation in `detected: , Please` could be improved, feel free to ignore though :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]
dongjoon-hyun opened a new pull request, #45106: URL: https://github.com/apache/spark/pull/45106 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
xinrong-meng commented on PR #45073: URL: https://github.com/apache/spark/pull/45073#issuecomment-1944388920 Merged to master, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]
xinrong-meng closed pull request #45073: [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession URL: https://github.com/apache/spark/pull/45073 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1489907739 ## sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala: ## @@ -46,5 +46,5 @@ private[sql] trait ValueState[S] extends Serializable { def update(newState: S): Unit /** Remove this state. */ - def remove(): Unit + def clear(): Unit Review Comment: Yes - correct, intentional change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1489903465 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors} +import org.apache.spark.sql.streaming.ListState + +/** + * Provides concrete implementation for list of values associated with a state variable + * used in the streaming transformWithState operator. + * + * @param store - reference to the StateStore instance to be used for storing state + * @param stateName - name of logical state partition + * @tparam S - data type of object that will be stored in the list + */ +class ListStateImpl[S](store: StateStore, + stateName: String, + keyExprEnc: ExpressionEncoder[Any]) + extends ListState[S] with Logging { + + /** Whether state exists or not. */ + override def exists(): Boolean = { + val encodedGroupingKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val stateValue = store.get(encodedGroupingKey, stateName) + stateValue != null + } + + /** Get the state value if it exists. If the state does not exist in state store, an +* empty iterator is returned. */ + override def get(): Iterator[S] = { + val encodedKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc) + val unsafeRowValuesIterator = store.valuesIterator(encodedKey, stateName) + new Iterator[S] { + override def hasNext: Boolean = { + unsafeRowValuesIterator.hasNext + } + + override def next(): S = { + val valueUnsafeRow = unsafeRowValuesIterator.next() + StateTypesEncoderUtils.decodeValue(valueUnsafeRow) + } + } + } + + /** Get the list value as an option if it exists and None otherwise. */ + override def getOption(): Option[Iterator[S]] = { + Option(get()) + } + + /** Update the value of the list. */ + override def put(newState: Array[S]): Unit = { + validateNewState(newState) + + if (newState.isEmpty) { + this.clear() Review Comment: Discussed with @sahnib offline and we think that its better to not allow passing empty list in this case to the `put` call. So, we will explicitly throw an exception in that case and for `get`, we will return empty iterator in case the value does not exist and a non-empty iterator otherwise. And we will also remove the `getOption` for this case since its use would be moot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]
chaoqin-li1123 commented on code in PR #45023: URL: https://github.com/apache/spark/pull/45023#discussion_r1489903039 ## python/pyspark/sql/datasource.py: ## @@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> Iterator[Union[Tuple, Row]]: ... +class DataSourceStreamReader(ABC): +""" +A base class for streaming data source readers. Data source stream readers are responsible +for outputting data from a streaming data source. + +.. versionadded: 4.0.0 +""" + +def initialOffset(self) -> dict: +""" +Return the initial offset of the streaming data source. +A new streaming query starts reading data from the initial offset. +If Spark is restarting an existing query, it will restart from the check-pointed offset +rather than the initial one. + +Returns +--- +dict +A dict whose key and values are str type. Review Comment: The serialization is easy, but deserialization will be tricky. We don't have runtime type information to deserialize a plain json text back to python object.(because we don't even know what python type the json is serialized from unless we keep extra type information in the json) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]
dongjoon-hyun commented on PR #45103: URL: https://github.com/apache/spark/pull/45103#issuecomment-1944373538 No problem. I just wanted to inform you. I fixed the field by removing my name. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]
William1104 commented on PR #45103: URL: https://github.com/apache/spark/pull/45103#issuecomment-1944365291 Hi @dongjoon-hyun I am sorry about that. I created the JIRA via cloning. I don't know how to update the assignee back to myself. Would you mind to change it back to me, or let me know how I can update the assignee. Best regards, William -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43259][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_2024 [spark]
MaxGekk commented on code in PR #45095: URL: https://github.com/apache/spark/pull/45095#discussion_r1489876902 ## sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala: ## @@ -1151,6 +1153,21 @@ class QueryExecutionErrorsSuite ) ) } + + test("SPARK-43259: Uses unsupported custom encoder") { +class CustomEncoder extends Encoder[Int] { + override def schema: StructType = StructType(Array.empty[StructField]) + override def clsTag: ClassTag[Int] = ClassTag.Int +} +val e = intercept[SparkRuntimeException] { + encoderFor(new CustomEncoder) Review Comment: Can you try to reproduce the issue using public Spark SQL API (sql or Java/Scala/Python api)? If it is not possible, we should convert the error to an internal one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47015][Collation] Disable partitioning on collated columns [spark]
MaxGekk commented on code in PR #45104: URL: https://github.com/apache/spark/pull/45104#discussion_r1489873175 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -174,4 +174,36 @@ class CollationSuite extends QueryTest with SharedSparkSession { Row(expected)) } } + + + test("disable partition on collated string column") { +def createTable(partitionColumns: String*): Unit = { + val tableName = "test_partition_tbl" + withTable(tableName) { +sql( + s""" + |CREATE TABLE $tableName + |(id INT, c1 STRING COLLATE 'UNICODE', c2 string) + |USING parquet + |PARTITIONED BY (${partitionColumns.mkString(",")}) + |""".stripMargin) + } + + // should work fine on non collated columns + createTable("id") + createTable("c2") + createTable("id", "c2") + + Seq(Seq("c1"), Seq("c1", "id"), Seq("c1", "c2")).foreach { partitionColumns => +checkError( + exception = intercept[AnalysisException] { +createTable(partitionColumns: _*) + }, + errorClass = "_LEGACY_ERROR_TEMP_1153", Review Comment: As you are here, could you assign proper name for the error class and improve error message format (if it is needed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun closed pull request #45096: [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` URL: https://github.com/apache/spark/pull/45096 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
viirya commented on code in PR #45096: URL: https://github.com/apache/spark/pull/45096#discussion_r1489871042 ## connector/kinesis-asl-assembly/pom.xml: ## @@ -59,16 +59,6 @@ commons-lang provided - - com.google.protobuf - protobuf-java - 2.6.1 - - Review Comment: Oh, I see. It was overriding the protobuf version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun commented on PR #45096: URL: https://github.com/apache/spark/pull/45096#issuecomment-1944333969 Thank you so much, @viirya ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]
dongjoon-hyun commented on PR #45103: URL: https://github.com/apache/spark/pull/45103#issuecomment-1944331715 BTW, @William1104 , please don't borrow someone-else name like this. ![Screenshot 2024-02-14 at 09 59 56](https://github.com/apache/spark/assets/9700541/9dfafa5c-6335-4450-998e-6832c66f8b6b) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun commented on code in PR #45096: URL: https://github.com/apache/spark/pull/45096#discussion_r1489867281 ## connector/kinesis-asl-assembly/pom.xml: ## @@ -59,16 +59,6 @@ commons-lang provided - - com.google.protobuf - protobuf-java - 2.6.1 - - Review Comment: Yes, it's `provided` dependency still as I described in the PR description. ![Screenshot 2024-02-14 at 09 58 01](https://github.com/apache/spark/assets/9700541/b7e829a5-f1cc-47d4-85b4-19cedcf0e46b) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun commented on code in PR #45096: URL: https://github.com/apache/spark/pull/45096#discussion_r1489867281 ## connector/kinesis-asl-assembly/pom.xml: ## @@ -59,16 +59,6 @@ commons-lang provided - - com.google.protobuf - protobuf-java - 2.6.1 - - Review Comment: Yes, it's `provided` dependency now as I described in the PR description. ![Screenshot 2024-02-14 at 09 58 01](https://github.com/apache/spark/assets/9700541/b7e829a5-f1cc-47d4-85b4-19cedcf0e46b) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1489865483 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala: ## @@ -78,7 +78,7 @@ class StatePartitionReader( StateStoreProvider.createAndInit( stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey, - useColumnFamilies = false, storeConf, hadoopConf.value) + useColumnFamilies = false, storeConf, hadoopConf.value, useMultipleValuesPerKey = false) Review Comment: Filed the ticket here - https://issues.apache.org/jira/browse/SPARK-47047 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
viirya commented on code in PR #45096: URL: https://github.com/apache/spark/pull/45096#discussion_r1489864752 ## connector/kinesis-asl-assembly/pom.xml: ## @@ -59,16 +59,6 @@ commons-lang provided - - com.google.protobuf - protobuf-java - 2.6.1 - - Review Comment: Before SPARK-14421, it is a provided dependency. Don't we need to keep it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]
anishshri-db commented on code in PR #44961: URL: https://github.com/apache/spark/pull/44961#discussion_r1489863110 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ## @@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider value } +override def valuesIterator(key: UnsafeRow, colFamilyName: String): Iterator[UnsafeRow] = { + verify(key != null, "Key cannot be null") + verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a encoder " + + "that supports multiple values for a single key.") + val valueIterator = encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName)) + + if (!isValidated && valueIterator.nonEmpty) { Review Comment: @HeartSaVioR - is it ok to remove this check everywhere ? I can create the change in a separate PR. Don't think this is giving us too much currently -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
viirya commented on PR #45096: URL: https://github.com/apache/spark/pull/45096#issuecomment-1944317618 Looking into this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun commented on PR #45096: URL: https://github.com/apache/spark/pull/45096#issuecomment-1944313907 Could you review this, please, @viirya ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]
dongjoon-hyun commented on PR #45096: URL: https://github.com/apache/spark/pull/45096#issuecomment-1944313704 All tests passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47036][SS] Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory [spark]
sahnib commented on code in PR #45092: URL: https://github.com/apache/spark/pull/45092#discussion_r1489848282 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -1863,6 +1864,91 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } } + test("ensure local files deleted on filesystem" + +" are cleaned from dfs file mapping") { +def getSSTFiles(dir: File): Set[File] = { + val sstFiles = new mutable.HashSet[File]() + dir.listFiles().foreach { f => +if (f.isDirectory) { + sstFiles ++= getSSTFiles(f) +} else { + if (f.getName.endsWith(".sst")) { +sstFiles.add(f) + } +} + } + sstFiles.toSet +} + +def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = { + dir.listFiles().foreach { f => +if (f.isDirectory) { + filterAndDeleteSSTFiles(f, filesToKeep) +} else { + if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) { +logInfo(s"deleting ${f.getAbsolutePath} from local directory") +f.delete() + } +} + } +} + +withTempDir { dir => + withTempDir { localDir => +val sqlConf = new SQLConf() +val dbConf = RocksDBConf(StateStoreConf(sqlConf)) +logInfo(s"config set to ${dbConf.compactOnCommit}") +val hadoopConf = new Configuration() +val remoteDir = dir.getCanonicalPath +withDB(remoteDir = remoteDir, + conf = dbConf, + hadoopConf = hadoopConf, + localDir = localDir) { db => + db.load(0) + db.put("a", "1") + db.put("b", "1") + db.commit() + // upload snapshots (with changelog checkpointing) Review Comment: The maintenance operation creates a snapshot only if changelog checkpointing is enabled. However, I agree that the comment is confusing though because the test runs both with/without changelog checkpointing. Further, state store does create snapshots on commit based on minDeltasForSnapshots (with changelog checkpointing enabled). I have removed the comment as I think it adds more confusion than helps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]
dongjoon-hyun closed pull request #45097: [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code URL: https://github.com/apache/spark/pull/45097 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]
dongjoon-hyun commented on PR #45097: URL: https://github.com/apache/spark/pull/45097#issuecomment-1944303256 Thank you so much, @huaxingao ! Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47042][BUILD] add missing explicit dependency 'commons-lang3' to the module 'spark-common-utils' [spark]
dongjoon-hyun commented on PR #45101: URL: https://github.com/apache/spark/pull/45101#issuecomment-1944302594 I agree with you at this point. > By explicitly declaring the dependency, we can avoid any unexpected missing dependencies that might occur when upgrading 'commons-text'. For the following question, since we cannot enumerate all transitive dependencies always, Apache Spark established a way to depend on an explicit manifest file (of the final Apache Spark distribution) and CI checking. - https://github.com/apache/spark/blob/master/dev/test-dependencies.sh - https://github.com/apache/spark/blob/master/dev/deps/spark-deps-hadoop-3-hive-2.3 > . If it is the established practice within the Spark project to address dependency issues -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]
huaxingao commented on PR #45097: URL: https://github.com/apache/spark/pull/45097#issuecomment-1944296112 LGTM. Thanks for the PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47009][Collation] Enable create table support for collation [spark]
stefankandic opened a new pull request, #45105: URL: https://github.com/apache/spark/pull/45105 ### What changes were proposed in this pull request? Adding support for create table with collated columns using parquet ### Why are the changes needed? In order to support basic DDL operations for collations ### Does this PR introduce _any_ user-facing change? Yes, users are now able to create tables with collated columns ### How was this patch tested? With UTs ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47042][BUILD] add missing explicit dependency 'commons-lang3' to the module 'spark-common-utils' [spark]
William1104 commented on PR #45101: URL: https://github.com/apache/spark/pull/45101#issuecomment-1944292205 Hi @dongjoon-hyun Thank you for taking the time to review my pull request. I appreciate your feedback and would like to address the points you raised. Regarding the declaration of explicit dependencies, I believe it is a common practice to declare dependencies explicitly when a module contains a class that directly depends on a specific library. This practice ensures that the dependencies are clearly defined and helps mitigate potential issues that may arise in the future. While a library like 'commons-text' may currently have 'commons-lang3' as a transitive dependency, there is no guarantee that this relationship will remain unchanged in future releases. By explicitly declaring the dependency, we can avoid any unexpected missing dependencies that might occur when upgrading 'commons-text'. However, I understand that different projects may have different approaches to handling dependency issues. If it is the established practice within the Spark project to address dependency issues only when they block enhancements or pose critical problems, I am willing to follow that practice and close this pull request accordingly. Thank you again for your review, and I look forward to any further guidance you may provide. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]
dongjoon-hyun commented on PR #45097: URL: https://github.com/apache/spark/pull/45097#issuecomment-1944262290 Could you review this PR, please, @huaxingao ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]
dongjoon-hyun commented on PR #45097: URL: https://github.com/apache/spark/pull/45097#issuecomment-1944261211 All tests passed. ![Screenshot 2024-02-14 at 09 12 56](https://github.com/apache/spark/assets/9700541/23f0f2ea-aeb1-4c34-907a-3415fdc002d3) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org