Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]

2024-02-14 Thread via GitHub


MaxGekk closed pull request #45098: [SPARK-47045][SQL] Replace 
`IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api`
URL: https://github.com/apache/spark/pull/45098


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]

2024-02-14 Thread via GitHub


MaxGekk commented on PR #45098:
URL: https://github.com/apache/spark/pull/45098#issuecomment-1945501972

   Merging to master. Thank you, @srielau for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47054][PYTHON][TESTS] Remove pinned version of torch for Python 3.12 support [spark]

2024-02-14 Thread via GitHub


HyukjinKwon opened a new pull request, #45113:
URL: https://github.com/apache/spark/pull/45113

   ### What changes were proposed in this pull request?
   
   This PR unpins the version for torch in our CI.
   
   ### Why are the changes needed?
   
   Testing latest version. This also blocks SPARK-46078.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, dev-only.
   
   ### How was this patch tested?
   
   Manually tested via `./dev/lint-python`.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47015][Collation] Disable partitioning on collated columns [spark]

2024-02-14 Thread via GitHub


cloud-fan commented on PR #45104:
URL: https://github.com/apache/spark/pull/45104#issuecomment-1945425646

   how about bucket columns? We generate the bucket id from the string value 
and assume all the semantically-same string values should generate the same 
bucket id, which isn't true for string with collation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47009][SQL] Enable create table support for collation [spark]

2024-02-14 Thread via GitHub


cloud-fan commented on PR #45105:
URL: https://github.com/apache/spark/pull/45105#issuecomment-1945424385

   We should put more high-level information: what's the corresponding parquet 
type for string with collation? and how do we fix the parquet max/min column 
stats?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47009][SQL] Enable create table support for collation [spark]

2024-02-14 Thread via GitHub


cloud-fan commented on code in PR #45105:
URL: https://github.com/apache/spark/pull/45105#discussion_r1490442977


##
sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala:
##
@@ -117,6 +117,7 @@ object DataType {
   private val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\-?\d+)\s*\)""".r
   private val CHAR_TYPE = """char\(\s*(\d+)\s*\)""".r
   private val VARCHAR_TYPE = """varchar\(\s*(\d+)\s*\)""".r
+  private val COLLATED_STRING_TYPE = """string\(\s*([\w_]+)\s*\)""".r

Review Comment:
   since it's a DDL string, shall we follow the actual SQL syntax and use 
`STRING COLLATE name`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47053][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR closed pull request #45111: [SPARK-47053][INFRA][3.5] Bump python 
libraries (pandas, pyarrow) in Docker image for release script
URL: https://github.com/apache/spark/pull/45111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47053][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on PR #45111:
URL: https://github.com/apache/spark/pull/45111#issuecomment-1945420722

   This was merged via 
[9b4778f](https://github.com/apache/spark/commit/9b4778fc1dc7047635c9ec19c187d4e75d182590)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] Skip MemoryProfilerParityTests when codecov enabled [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on PR #45112:
URL: https://github.com/apache/spark/pull/45112#issuecomment-1945413337

   Thx! I manually ran linters, and other tests won't be verified here. 
therefore just merging..
   
   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] Skip MemoryProfilerParityTests when codecov enabled [spark]

2024-02-14 Thread via GitHub


HyukjinKwon closed pull request #45112: [SPARK-46687][TESTS][PYTHON][FOLLOW-UP] 
Skip MemoryProfilerParityTests when codecov enabled
URL: https://github.com/apache/spark/pull/45112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on PR #45111:
URL: https://github.com/apache/spark/pull/45111#issuecomment-1945412391

   Let me merge this to continue release step. 3.5.1 RC will start with 3.5.1 
RC2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46687][TESTS][PYTHON] Skip MemoryProfilerParityTests when codecov enabled [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on PR #45112:
URL: https://github.com/apache/spark/pull/45112#issuecomment-1945409614

   cc @xinrong-meng @ueshin 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46687][TESTS][PYTHON] Skip MemoryProfilerParityTests when codecov enabled [spark]

2024-02-14 Thread via GitHub


HyukjinKwon opened a new pull request, #45112:
URL: https://github.com/apache/spark/pull/45112

   ### What changes were proposed in this pull request?
   
   This is a followup of https://github.com/apache/spark/pull/44775 that skips 
the tests with codecov on. It fails now 
(https://github.com/apache/spark/actions/runs/7709423681/job/21010676103) and 
the coverage report is broken.
   
   ### Why are the changes needed?
   
   To recover the test coverage report.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, test-only.
   
   ### How was this patch tested?
   
   Manually tested.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No,
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47009][Collation] Enable create table support for collation [spark]

2024-02-14 Thread via GitHub


cloud-fan commented on code in PR #45105:
URL: https://github.com/apache/spark/pull/45105#discussion_r1490415076


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala:
##
@@ -58,8 +59,8 @@ class DataTypeAstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] {
* Resolve/create a primitive type.
*/
   override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType 
= withOrigin(ctx) {
-val typeName = ctx.`type`.start.getType
-(typeName, ctx.INTEGER_VALUE().asScala.toList) match {
+val typeCtx = ctx.`type`
+(typeCtx.start.getType, ctx.INTEGER_VALUE().asScala.toList) match {

Review Comment:
   cc @hvanhovell @HyukjinKwon , since we put the SQL parser in the Spark 
Connect Scala client component, adding new data types means a client version 
upgrade.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47009][Collation] Enable create table support for collation [spark]

2024-02-14 Thread via GitHub


cloud-fan commented on code in PR #45105:
URL: https://github.com/apache/spark/pull/45105#discussion_r1490413649


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -1095,6 +1095,10 @@ colPosition
 : position=FIRST | position=AFTER afterCol=errorCapturingIdentifier
 ;
 
+collateExpression

Review Comment:
   maybe `collateClause`? This is not really an expression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47044] Add executed query for JDBC external datasources to explain output [spark]

2024-02-14 Thread via GitHub


dtenedor commented on code in PR #45102:
URL: https://github.com/apache/spark/pull/45102#discussion_r1490403653


##
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala:
##
@@ -81,17 +81,20 @@ class JdbcSQLQueryBuilder(dialect: JdbcDialect, options: 
JDBCOptions) {
   /**
* Constructs the WHERE clause that following dialect's SQL syntax.
*/
-  def withPredicates(predicates: Array[Predicate], part: JDBCPartition): 
JdbcSQLQueryBuilder = {
+  def withPredicates(predicates: Array[Predicate],

Review Comment:
   please start these arguments on the next line, indented by +4 spaces each.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46820][PYTHON] Fix error message regression by restoring `new_msg` [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on code in PR #44859:
URL: https://github.com/apache/spark/pull/44859#discussion_r1490402927


##
python/pyspark/sql/types.py:
##
@@ -2197,8 +2197,10 @@ def verify_nullability(obj: Any) -> bool:
 return True
 else:
 raise PySparkValueError(
-error_class="CANNOT_BE_NONE",
-message_parameters={"arg_name": "obj"},
+error_class="FIELD_NOT_NULLABLE",
+message_parameters={
+"field_name": name if name is not None else "",

Review Comment:
   Seems like the error message would look weird if this is an empty string.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45107:
URL: https://github.com/apache/spark/pull/45107#issuecomment-1945383372

   Merged to master for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun closed pull request #45107: [SPARK-47051][INFRA] Create a new 
test pipeline for `yarn` and `connect`
URL: https://github.com/apache/spark/pull/45107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490390398


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   From my side, we don't use YARN and Spark Connect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490389222


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   Thanks! I hoped so too. For now, `SparkSessionE2ESuite` flakiness and 
`YarnClusterSuite` seems to be abandoned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490374178


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   hmmm.. I think we should actually better fix the flakiness .. I am fine with 
this as a temporary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46906][INFRA][3.5] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR opened a new pull request, #45111:
URL: https://github.com/apache/spark/pull/45111

   ### What changes were proposed in this pull request?
   
   This PR proposes to bump python libraries (pandas to 2.0.3, pyarrow to 
4.0.0) in Docker image for release script.
   
   ### Why are the changes needed?
   
   Without this change, release script (do-release-docker.sh) fails on docs 
phase. Changing this fixes the release process against branch-3.5.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Confirmed with dry-run of release script against branch-3.5.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on PR #45110:
URL: https://github.com/apache/spark/pull/45110#issuecomment-1945364343

   docs phase against master branch + this PR failed "before" python docs build:
   
   ```
   Your bundle is locked to sass-embedded (1.69.7) from rubygems repository
   https://rubygems.org/ or installed locally, but that version can no longer be
   found in that source. That means the author of sass-embedded (1.69.7) has 
removed
   it. You'll need to update your bundle to a version other than sass-embedded
   (1.69.7) that hasn't been removed in order to install.
   ```
   
   Rebasing this PR to master branch doesn't seem to work as expected. I'll 
submit a PR for branch-3.5, as cherry-pick has conflict anyway.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1490354597


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider
   value
 }
 
+override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
+  verify(key != null, "Key cannot be null")
+  verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a 
encoder " +
+  "that supports multiple values for a single key.")
+  val valueIterator = 
encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName))
+
+  if (!isValidated && valueIterator.nonEmpty) {

Review Comment:
   Discussed offline. There had been a couple of cases which inspection of 
state row in underneath were helpful on debugging. Maybe we could remove this 
once we improve state data source reader to produce state key-value as binary, 
but before that let's keep it as it is for existing codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on PR #44927:
URL: https://github.com/apache/spark/pull/44927#issuecomment-1945339250

   @jingz-db Mind retriggering GA? You can either manually do this in your fork 
or simply push an empty commit to do this automatically. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on PR #45110:
URL: https://github.com/apache/spark/pull/45110#issuecomment-1945338259

   I'm now running release script with dry-run against master branch. Will 
update PR description once it works for master branch as well. Otherwise I'll 
rebase this PR against branch-3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-46906][INFRA] Bump python libraries (pandas, pyarrow) in Docker image for release script [spark]

2024-02-14 Thread via GitHub


HeartSaVioR opened a new pull request, #45110:
URL: https://github.com/apache/spark/pull/45110

   ### What changes were proposed in this pull request?
   
   This PR proposes to bump python libraries (pandas to 2.0.3, pyarrow to 
4.0.0) in Docker image for release script.
   
   ### Why are the changes needed?
   
   Without this change, release script (do-release-docker.sh) fails on docs 
phase. Changing this fixes the release process against branch-3.5.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Confirmed with dry-run of release script against branch-3.5.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45090:
URL: https://github.com/apache/spark/pull/45090#issuecomment-1945326854

   If then, we need to revert this. Could you confirm that the above works for 
you, @grundprinzip ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45090:
URL: https://github.com/apache/spark/pull/45090#issuecomment-1945326307

   Oh, right. Does it work in the same with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47040][CONNECT] Allow Spark Connect Server Script to wait [spark]

2024-02-14 Thread via GitHub


pan3793 commented on PR #45090:
URL: https://github.com/apache/spark/pull/45090#issuecomment-1945309543

   > ... that leaves it running in the foreground ...
   
   could it be achieved by the following command?
   ```
   SPARK_NO_DAEMONIZE=1 ./sbin/start-connect-server.sh
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490256089


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)

Review Comment:
   We only use data source schema when user doesn't specify schema.
   
https://github.com/apache/spark/blob/736d8ab3f00e7c5ba1b01c22f6398b636b8492ea/python/pyspark/sql/worker/create_data_source.py#L144



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490236219


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   Now, the `streaming, ...` pipeline run time is down to 62m. So, the total 
increased overhead is around 20min.
   
   ![Screenshot 2024-02-14 at 16 47 
28](https://github.com/apache/spark/assets/9700541/18ba8ad1-3295-4aff-b853-bbc30f3c830a)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490233363


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   Ya, I also was reluctant due to that building time. Specifically, It will 
take `34 minutes` in total run time. The second goal is to reduce the 
re-trigger time. Previously, YARN/Connect/Kafka modules build a bad synergy 
because their failure rates are multiplied. And, I needed to re-trigger and 
wait over 70 ~ 90minutes for this pipeline.
   
   ![Screenshot 2024-02-14 at 16 39 
19](https://github.com/apache/spark/assets/9700541/e1afe7d3-8c0c-4ac0-93d0-34c810eb6366)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490233363


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   Ya, I also was reluctant. It will take `34 minutes` in total run time. The 
second goal is to reduce the re-trigger time. Previously, YARN/Connect/Kafka 
modules build a bad synergy because their failure rates are multiplied. And, I 
needed to re-trigger and wait over 70 ~ 90minutes for this pipeline.
   
   ![Screenshot 2024-02-14 at 16 39 
19](https://github.com/apache/spark/assets/9700541/e1afe7d3-8c0c-4ac0-93d0-34c810eb6366)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on PR #43210:
URL: https://github.com/apache/spark/pull/43210#issuecomment-1945142540

   Reverted at 
https://github.com/apache/spark/commit/ea6b25767fb86732c108c759fd5393caee22f129 
in branch-3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [Don't merge & review] verify sbt on master [spark]

2024-02-14 Thread via GitHub


github-actions[bot] closed pull request #43079: [Don't merge & review] verify 
sbt on master
URL: https://github.com/apache/spark/pull/43079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45669][CORE] Ensure the continuity of rolling log index [spark]

2024-02-14 Thread via GitHub


github-actions[bot] commented on PR #43534:
URL: https://github.com/apache/spark/pull/43534#issuecomment-1945132826

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45396][PYTHON] Add doc entry for `pyspark.ml.connect` module, and adds `Evaluator` to `__all__` at `ml.connect` [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on PR #43210:
URL: https://github.com/apache/spark/pull/43210#issuecomment-1945132337

   Let's revert it in branch-3.5, and fix it again. It's not critical bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


HyukjinKwon commented on code in PR #45107:
URL: https://github.com/apache/spark/pull/45107#discussion_r1490216663


##
.github/workflows/build_and_test.yml:
##
@@ -147,8 +147,9 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-yarn, kubernetes, hadoop-cloud, spark-ganglia-lgpl,
-connect, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+  - >-
+yarn, connect

Review Comment:
   Hmmm ... Does separating the build help the stabilization? I am not against 
this change but splitting this will cause 30mins time for building. Although it 
runs in parallel, individual fork has some resource limitation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490215001


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"0": "2"}
+  |def latestOffset(self):
+  |return {"0": "2"}

Review Comment:
   Nice suggestion, changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490214832


##
sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonStreamingDataSourceSuite.scala:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.AnalysisException
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.{createUserDefinedPythonDataSource, 
shouldTestPandasUDFs}
+import 
org.apache.spark.sql.execution.datasources.v2.python.{PythonDataSourceV2, 
PythonMicroBatchStream, PythonStreamingSourceOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PythonStreamingDataSourceSuite extends PythonDataSourceSuiteBase {
+
+  protected def simpleDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class SimpleDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |return {"0": "2"}
+  |def latestOffset(self):
+  |return {"0": "2"}
+  |def partitions(self, start: dict, end: dict):
+  |return [InputPartition(i) for i in range(int(start["0"]))]
+  |def commit(self, end: dict):
+  |1 + 2
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  protected def errorDataStreamReaderScript: String =
+"""
+  |from pyspark.sql.datasource import DataSourceStreamReader, 
InputPartition
+  |
+  |class ErrorDataStreamReader(DataSourceStreamReader):
+  |def initialOffset(self):
+  |raise Exception("error reading initial offset")
+  |def latestOffset(self):
+  |raise Exception("error reading latest offset")
+  |def partitions(self, start: dict, end: dict):
+  |raise Exception("error planning partitions")
+  |def commit(self, end: dict):
+  |raise Exception("error committing offset")
+  |def read(self, partition):
+  |yield (0, partition.value)
+  |yield (1, partition.value)
+  |yield (2, partition.value)
+  |""".stripMargin
+
+  private val errorDataSourceName = "ErrorDataSource"
+
+  test("simple data stream source") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |$simpleDataStreamReaderScript
+ |
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |return SimpleDataStreamReader()
+ |""".stripMargin
+val inputSchema = StructType.fromDDL("input BINARY")
+
+val dataSource = createUserDefinedPythonDataSource(dataSourceName, 
dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, dataSource)
+val pythonDs = new PythonDataSourceV2
+pythonDs.setShortName("SimpleDataSource")
+val stream = new PythonMicroBatchStream(
+  pythonDs, dataSourceName, inputSchema, CaseInsensitiveStringMap.empty())
+
+val initialOffset = stream.initialOffset()
+assert(initialOffset.json == "{\"0\": \"2\"}")
+for (_ <- 1 to 50) {
+  val offset = stream.latestOffset()
+  assert(offset.json == "{\"0\": \"2\"}")
+  assert(stream.planInputPartitions(offset, offset).size == 2)
+  stream.commit(offset)
+}
+stream.stop()
+  }
+
+  test("Error creating stream reader") {
+assume(shouldTestPandasUDFs)
+val dataSourceScript =
+  s"""
+ |from pyspark.sql.datasource import DataSource
+ |class $dataSourceName(DataSource):
+ |def streamReader(self, schema):
+ |raise Exception("error creating stream reader")
+ |""".stripMargin
+val dataSource = createUserDefinedPythonDataSource(
+  name = dataSourceName, pythonScript = dataSourceScript)
+spark.dataSource.registerPython(dataSourceName, dataSourc

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490214280


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonStreamingSourceRunner.scala:
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.api.python.{PythonFunction, PythonWorker, 
PythonWorkerFactory, PythonWorkerUtils, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
+import org.apache.spark.internal.config.Python.{PYTHON_AUTH_SOCKET_TIMEOUT, 
PYTHON_USE_DAEMON}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+object PythonStreamingSourceRunner {
+  val initialOffsetFuncId = 884
+  val latestOffsetFuncId = 885
+  val partitionsFuncId = 886
+  val commitFuncId = 887
+}
+
+/**
+ * This class is a proxy to invoke methods in Python DataSourceStreamReader 
from JVM.
+ * A runner spawns a python worker process. In the main function, set up 
communication
+ * between JVM and python process through socket and create a 
DataSourceStreamReader instance.
+ * In an infinite loop, the python worker process poll information(function 
name and parameters)
+ * from the socket, invoke the corresponding method of StreamReader and send 
return value to JVM.
+ */
+class PythonStreamingSourceRunner(
+func: PythonFunction,
+outputSchema: StructType) extends Logging  {
+  val workerModule = "pyspark.sql.streaming.python_streaming_source_runner"
+
+  private val conf = SparkEnv.get.conf
+  protected val bufferSize: Int = conf.get(BUFFER_SIZE)
+  protected val authSocketTimeout = conf.get(PYTHON_AUTH_SOCKET_TIMEOUT)
+
+  private val envVars: java.util.Map[String, String] = func.envVars
+  private val pythonExec: String = func.pythonExec
+  private var pythonWorker: Option[PythonWorker] = None
+  private var pythonWorkerFactory: Option[PythonWorkerFactory] = None
+  protected val pythonVer: String = func.pythonVer
+
+  private var dataOut: DataOutputStream = null
+  private var dataIn: DataInputStream = null
+
+  import PythonStreamingSourceRunner._
+
+  /**
+   * Initializes the Python worker for running the streaming source.
+   */
+  def init(): Unit = {
+logInfo(s"Initializing Python runner pythonExec: $pythonExec")
+val env = SparkEnv.get
+
+val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+envVars.put("SPARK_LOCAL_DIRS", localdir)
+
+envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
+envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+
+val prevConf = conf.get(PYTHON_USE_DAEMON)
+conf.set(PYTHON_USE_DAEMON, false)
+try {
+  val workerFactory =
+new PythonWorkerFactory(pythonExec, workerModule, 
envVars.asScala.toMap)
+  val (worker: PythonWorker, _) = 
workerFactory.createSimpleWorker(blockingMode = true)
+  pythonWorker = Some(worker)
+  pythonWorkerFactory = Some(workerFactory)
+} finally {
+  conf.set(PYTHON_USE_DAEMON, prevConf)
+}
+
+val stream = new BufferedOutputStream(
+  pythonWorker.get.channel.socket().getOutputStream, bufferSize)
+dataOut = new DataOutputStream(stream)
+
+PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
+
+val pythonIncludes = func.pythonIncludes.asScala.toSet
+PythonWorkerUtils.writeSparkFiles(Some("streaming_job"), pythonIncludes, 
dataOut)
+
+// Send the user function to python process
+PythonWorkerUtils.writePythonFunction(func, dataOut)
+
+// Send output schema
+PythonWorkerUtils.writeUTF(outputSchema.json, dataOut)
+
+// Send configurations
+dataOut.writeInt(SQLConf.get.arrowMaxRecordsPerBatch)
+dataOut.flush()
+
+dataIn = new DataInputStream(
+  new 
BufferedInputStream(pytho

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490213270


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)
+schema = _parse_datatype_json_string(schema_json)
+if not isinstance(schema, StructType):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "an output schema of type 'StructType'",
+"actual": f"'{type(schema).__name__}'",
+},
+)
+
+# Receive the configuration values.

Review Comment:
   Removed. It is no longer necessary because the pickled read function will be 
created from another python worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spa

Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490211705


##
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##
@@ -0,0 +1,178 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import sys
+import functools
+import json
+from itertools import islice
+from typing import IO, List, Iterator, Iterable
+
+from pyspark.accumulators import _accumulatorRegistry
+from pyspark.errors import PySparkAssertionError, PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.serializers import (
+read_int,
+write_int,
+write_with_length,
+SpecialLengths,
+)
+from pyspark.sql.connect.conversion import ArrowTableToRowsConversion, 
LocalDataToArrowConversion
+from pyspark.sql.datasource import DataSource, InputPartition
+from pyspark.sql.pandas.types import to_arrow_schema
+from pyspark.sql.types import (
+_parse_datatype_json_string,
+BinaryType,
+StructType,
+)
+from pyspark.util import handle_worker_exception
+from pyspark.worker_util import (
+check_python_version,
+read_command,
+pickleSer,
+send_accumulator_updates,
+setup_broadcasts,
+setup_memory_limits,
+setup_spark_files,
+utf8_deserializer,
+)
+
+initial_offset_func_id = 884
+latest_offset_func_id = 885
+partitions_func_id = 886
+commit_func_id = 887
+
+
+def initial_offset_func(reader, outfile):
+offset = reader.initialOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def latest_offset_func(reader, outfile):
+offset = reader.latestOffset()
+write_with_length(json.dumps(offset).encode("utf-8"), outfile)
+
+
+def partitions_func(reader, infile, outfile):
+start_offset = json.loads(utf8_deserializer.loads(infile))
+end_offset = json.loads(utf8_deserializer.loads(infile))
+partitions = reader.partitions(start_offset, end_offset)
+# Return the serialized partition values.
+write_int(len(partitions), outfile)
+for partition in partitions:
+pickleSer._write_with_length(partition, outfile)
+
+
+def commit_func(reader, infile, outfile):
+end_offset = json.loads(utf8_deserializer.loads(infile))
+reader.commit(end_offset)
+write_int(0, outfile)
+
+
+def main(infile: IO, outfile: IO) -> None:
+try:
+check_python_version(infile)
+setup_spark_files(infile)
+
+memory_limit_mb = int(os.environ.get("PYSPARK_PLANNER_MEMORY_MB", 
"-1"))
+setup_memory_limits(memory_limit_mb)
+
+_accumulatorRegistry.clear()
+
+# Receive the data source instance.
+data_source = read_command(pickleSer, infile)
+
+if not isinstance(data_source, DataSource):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "a Python data source instance of type 
'DataSource'",
+"actual": f"'{type(data_source).__name__}'",
+},
+)
+
+# Receive the data source output schema.
+schema_json = utf8_deserializer.loads(infile)
+schema = _parse_datatype_json_string(schema_json)
+if not isinstance(schema, StructType):
+raise PySparkAssertionError(
+error_class="PYTHON_DATA_SOURCE_TYPE_MISMATCH",
+message_parameters={
+"expected": "an output schema of type 'StructType'",
+"actual": f"'{type(schema).__name__}'",
+},
+)
+
+# Receive the configuration values.
+max_arrow_batch_size = read_int(infile)
+assert max_arrow_batch_size > 0, (
+"The maximum arrow batch size should be greater than 0, but got "
+f"'{max_arrow_batch_size}'"
+)
+
+# Instantiate data source reader.
+try:
+reader = data_source.streamReader(schema=schema)
+# Initialization succeed.
+write_int(0, outfile)
+outfile.flush()
+
+# handle method call from socket
+while True:
+func_id = read_int(infile)
+

[PR] [SPARK-47052][WIP] Separate state tracking variables from MicroBatchExecution/StreamExecution [spark]

2024-02-14 Thread via GitHub


jerrypeng opened a new pull request, #45109:
URL: https://github.com/apache/spark/pull/45109

   
   
   
   ### What changes were proposed in this pull request?
   
   
   To improve code clarity and maintainability, I propose that we move all the 
variables that track mutable state and metrics for streaming query into a 
separate class.  With this refactor, it would be easy to track and find all the 
mutable state a microbatch can have.  
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45107:
URL: https://github.com/apache/spark/pull/45107#issuecomment-1945067506

   Could you review this test pipeline adjustment PR, @HyukjinKwon ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [wip] value state ttl poc [spark]

2024-02-14 Thread via GitHub


ericm-db opened a new pull request, #45108:
URL: https://github.com/apache/spark/pull/45108

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47051][INFRA] Create a new test pipeline for `yarn` and `connect` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun opened a new pull request, #45107:
URL: https://github.com/apache/spark/pull/45107

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun closed pull request #45106: [SPARK-47049][BUILD] Ban non-shaded 
Hadoop dependencies
URL: https://github.com/apache/spark/pull/45106


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45106:
URL: https://github.com/apache/spark/pull/45106#issuecomment-1944840049

   Thank you, @sunchao . Yes, it's irrelevant to this.
   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490134034


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   UPDATE: we are discussing offline now and it could take time - this is tied 
to the interface and crazily uneasy to change after it is shipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490116430


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
   Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val aggregated =
+inputData.toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .as[(Int, Long)]
+
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 1)),
+StopStream
+  )
+
+  // Delete operator metadata path
+  val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+  val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+  fm.delete(metadataPath)
+
+  // Restart the query
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 2)),
+StopStream
+  )
+}
+  }
+
+  test("Changing operator - " +
+"replace, add, remove operators will trigger error with debug message") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+  testStream(stream.dropDuplicates())(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 1),
+ProcessAllAvailable(),
+StopStream
+  )
+
+  def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   Got it! Thanks Jungtaek!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490111896


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   That essentially means we need to add the deserializeOffset() interface to 
the StreamReader for deserialization. Or is there any alternative?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value (flipping the coin we need to still restrict the types as we have to 
convert this to json), but probably less headache to let data source 
implementation to take care of themselves.



##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value (flipping the coin we need to still restrict the available types as we 
have to convert this to json), but probably less headache to let data source 
implementation to take care of themselves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490109817


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Maybe yes. I'm OK with dict with clearly explaining which type they can use 
as value, but probably less headache to let data source implementation to take 
care of themselves.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490107780


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   So data source implementation provides serde between its offset model <-> 
json, and python worker will handle it with serde being provided. For python 
worker <-> JVM, json is used. Does this make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490106637


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   Does that mean we also need to add deserializeOffset() to the python 
interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1490104863


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   We expect data source to know about its offset model - we pass a json format 
of offset to specific data source, which is expected to be deserialized 
successfully to its offset model. It will come to the runtime error if the json 
format of offset and offset model aren't compatible, which I think Scala side 
is doing the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490093460


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##
@@ -215,4 +215,79 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
   Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val aggregated =
+inputData.toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .as[(Int, Long)]
+
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 1)),
+StopStream
+  )
+
+  // Delete operator metadata path
+  val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+  val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+  fm.delete(metadataPath)
+
+  // Restart the query
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 2)),
+StopStream
+  )
+}
+  }
+
+  Seq("Replace", "Add", "Remove").foreach { operation =>
+test(s"$operation stateful operator will trigger error with guidance") {
+  withTempDir { checkpointDir =>
+val inputData = MemoryStream[Int]
+val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+testStream(stream.dropDuplicates())(
+  StartStream(checkpointLocation = checkpointDir.toString),
+  AddData(inputData, 1),
+  ProcessAllAvailable(),
+  StopStream)
+
+val (opsInMetadataSeq, opsInCurBatchSeq, restartStream) = operation 
match {
+  case "Add" =>
+(
+  Map(0L -> "dedupe"),
+  Map(0L -> "stateStoreSave", 1L -> "dedupe"),
+  stream.dropDuplicates().groupBy("value").count())
+  case "Replace" =>
+(Map(0L -> "dedupe"), Map(0L -> "stateStoreSave"), 
stream.groupBy("value").count())
+  case "Remove" =>
+(Map(0L -> "dedupe"), Map.empty[Long, String], stream)
+}
+
+testStream(restartStream, Update)(
+  StartStream(checkpointLocation = checkpointDir.toString),
+  AddData(inputData, 3),
+  ExpectFailure[SparkRuntimeException] { t => {

Review Comment:
   super nit: another {} is unnecessary after `{ t =>`. multiple lines are 
allowed after `=>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45106:
URL: https://github.com/apache/spark/pull/45106#issuecomment-1944653099

   Could you review this, please, @sunchao ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1490087695


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
   Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val aggregated =
+inputData.toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .as[(Int, Long)]
+
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 1)),
+StopStream
+  )
+
+  // Delete operator metadata path
+  val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+  val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+  fm.delete(metadataPath)
+
+  // Restart the query
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 2)),
+StopStream
+  )
+}
+  }
+
+  test("Changing operator - " +
+"replace, add, remove operators will trigger error with debug message") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+  testStream(stream.dropDuplicates())(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 1),
+ProcessAllAvailable(),
+StopStream
+  )
+
+  def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   Maybe community has to run the formatter at once for the whole codebase. I'm 
not sure scalafmt can deal with the whole styles though. It is still good to 
familiarize Scala style guide for Databricks; it doesn't only contain styles 
automation can handle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


HeartSaVioR commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1490075730


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -75,6 +76,61 @@ private[sql] class RocksDBStateStoreProvider
   value
 }
 
+/**
+ * Provides an iterator containing all values of a non-null key.
+ *
+ * Inside RocksDB, the values are merged together and stored as a byte 
Array.
+ * This operation relies on state store value encoder to be able to split 
the
+ * single array into multiple values.
+ *
+ * Also see [[MultiValuedStateEncoder]] which supports encoding/decoding 
multiple
+ * values per key.
+ */
+override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
+  verify(key != null, "Key cannot be null")
+
+  val kvEncoder = keyValueEncoderMap.get(colFamilyName)
+  val valueEncoder = kvEncoder._2
+  val keyEncoder = kvEncoder._1
+
+  verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator 
requires a encoder " +
+  "that supports multiple values for a single key.")
+  val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
+  val valueIterator = valueEncoder.decodeValues(encodedKey)
+
+  if (valueIterator.nonEmpty) {
+new Iterator[UnsafeRow] {
+  override def hasNext: Boolean = {
+valueIterator.hasNext
+  }
+
+  override def next(): UnsafeRow = {
+val value = valueIterator.next()
+if (value != null) {
+  StateStoreProvider.validateStateRowFormat(

Review Comment:
   This triggers overhead so we wouldn't like to do this for every value. 
Either we have to sample (that's why we only do for the first value) or not 
doing this at all. It's probably OK to do latter as get() also doesn't do this 
for the case where there are multiple CFs.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala:
##
@@ -77,9 +50,7 @@ class ValueStateImpl[S](
   override def getOption(): Option[S] = {
 val retRow = getImpl()

Review Comment:
   I know this is beyond the scope of PR, but while we are here, I don't see 
any difference with `Option(get())`.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##
@@ -131,6 +141,15 @@ trait StateStore extends ReadStateStore {
   def remove(key: UnsafeRow,
 colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit
 
+  /**
+   * Merges the provided value with existing values of a non-null key.

Review Comment:
   Maybe we want to explicitly document the contract on the behavior when 
performing merge against non-existing key, since all implementations should 
provide the same.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##
@@ -67,6 +67,15 @@ trait ReadStateStore {
   def get(key: UnsafeRow,
 colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow
 
+  /**
+   * Provides an iterator containing all values of a non-null key.

Review Comment:
   Let's make a contract about expected return value, especially about 
clarification of value for non-existing key. Multiple implementations should 
follow the contract so this needs to be clearly explained.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import 
org.apache.spark.sql.execution.streaming.StateKeyValueRowSchema.{KEY_ROW_SCHEMA,
 VALUE_ROW_SCHEMA}
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors}
+import org.apache.spark.sql.streaming.ListState
+
+/**
+ * Provides concrete implementation for list of values associated with a state 
variable
+ * used in the streaming transformWithState operator.
+ *
+ * @param store - reference to the St

Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


jingz-db commented on PR #44927:
URL: https://github.com/apache/spark/pull/44927#issuecomment-1944494272

   Thanks Jungtaek for your thorough code review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


sahnib commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1489986029


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors}
+import org.apache.spark.sql.streaming.ListState
+
+/**
+ * Provides concrete implementation for list of values associated with a state 
variable
+ * used in the streaming transformWithState operator.
+ *
+ * @param store - reference to the StateStore instance to be used for storing 
state
+ * @param stateName - name of logical state partition
+ * @tparam S - data type of object that will be stored in the list
+ */
+class ListStateImpl[S](store: StateStore,
+ stateName: String,
+ keyExprEnc: ExpressionEncoder[Any])
+  extends ListState[S] with Logging {
+
+   /** Whether state exists or not. */
+   override def exists(): Boolean = {
+ val encodedGroupingKey = 
StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc)
+ val stateValue = store.get(encodedGroupingKey, stateName)
+ stateValue != null
+   }
+
+   /** Get the state value if it exists. If the state does not exist in state 
store, an
+* empty iterator is returned. */
+   override def get(): Iterator[S] = {
+ val encodedKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, 
keyExprEnc)
+ val unsafeRowValuesIterator = store.valuesIterator(encodedKey, stateName)
+ new Iterator[S] {
+   override def hasNext: Boolean = {
+ unsafeRowValuesIterator.hasNext
+   }
+
+   override def next(): S = {
+ val valueUnsafeRow = unsafeRowValuesIterator.next()
+ StateTypesEncoderUtils.decodeValue(valueUnsafeRow)
+   }
+ }
+   }
+
+   /** Get the list value as an option if it exists and None otherwise. */
+   override def getOption(): Option[Iterator[S]] = {
+ Option(get())
+   }
+
+   /** Update the value of the list. */
+   override def put(newState: Array[S]): Unit = {
+ validateNewState(newState)
+
+ if (newState.isEmpty) {
+   this.clear()

Review Comment:
   Added a validation for put/appendList to ensure empty lists cannot be 
passed. Passing an empty list throws a user Error. I have also removed 
`getOption` as discussed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1489982761


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
   Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val aggregated =
+inputData.toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .as[(Int, Long)]
+
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 1)),
+StopStream
+  )
+
+  // Delete operator metadata path
+  val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+  val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+  fm.delete(metadataPath)
+
+  // Restart the query
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 2)),
+StopStream
+  )
+}
+  }
+
+  test("Changing operator - " +
+"replace, add, remove operators will trigger error with debug message") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+  testStream(stream.dropDuplicates())(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 1),
+ProcessAllAvailable(),
+StopStream
+  )
+
+  def checkOpChangeError(OpsInMetadataSeq: Seq[String],

Review Comment:
   Do we have any automation tool for checking this other than 
`./dev/scalafmt`? This command is listed on the [spark developer tool 
wiki](https://spark.apache.org/developer-tools.html#:~:text=the%20style%20guide.-,Formatting%20code,-To%20format%20Scala),
 and is actually quite messy - it will touch all existing files other than only 
formatting my code change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46906][SS] Add a check for stateful operator change for streaming [spark]

2024-02-14 Thread via GitHub


jingz-db commented on code in PR #44927:
URL: https://github.com/apache/spark/pull/44927#discussion_r1489977792


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##
@@ -215,4 +216,117 @@ class OperatorStateMetadataSuite extends StreamTest with 
SharedSparkSession {
 checkError(exc, "STDS_REQUIRED_OPTION_UNSPECIFIED", "42601",
   Map("optionName" -> StateSourceOptions.PATH))
   }
+
+  test("Operator metadata path non-existence should not fail query") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val aggregated =
+inputData.toDF()
+  .groupBy($"value")
+  .agg(count("*"))
+  .as[(Int, Long)]
+
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 1)),
+StopStream
+  )
+
+  // Delete operator metadata path
+  val metadataPath = new Path(checkpointDir.toString, 
s"state/0/_metadata/metadata")
+  val fm = CheckpointFileManager.create(new 
Path(checkpointDir.getCanonicalPath), hadoopConf)
+  fm.delete(metadataPath)
+
+  // Restart the query
+  testStream(aggregated, Complete)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+CheckLastBatch((3, 2)),
+StopStream
+  )
+}
+  }
+
+  test("Changing operator - " +
+"replace, add, remove operators will trigger error with debug message") {
+withTempDir { checkpointDir =>
+  val inputData = MemoryStream[Int]
+  val stream = inputData.toDF().withColumn("eventTime", 
timestamp_seconds($"value"))
+
+  testStream(stream.dropDuplicates())(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 1),
+ProcessAllAvailable(),
+StopStream
+  )
+
+  def checkOpChangeError(OpsInMetadataSeq: Seq[String],
+ OpsInCurBatchSeq: Seq[String],
+ ex: Throwable): Unit = {
+checkError(ex.asInstanceOf[SparkRuntimeException],
+  "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA", "42K03",
+  Map("OpsInMetadataSeq" -> OpsInMetadataSeq.mkString(", "),
+"OpsInCurBatchSeq" -> OpsInCurBatchSeq.mkString(", "))
+)
+  }
+
+  // replace dropDuplicates with dropDuplicatesWithinWatermark
+  testStream(stream.withWatermark("eventTime", "10 seconds")
+.dropDuplicatesWithinWatermark(), Append)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 2),
+ExpectFailure[SparkRuntimeException] {
+  (t: Throwable) => {
+checkOpChangeError(Seq("dedupe"), Seq("dedupeWithinWatermark"), t)
+  }
+}
+  )
+
+  // replace operator
+  testStream(stream.groupBy("value").count(), Update)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+ExpectFailure[SparkRuntimeException] {
+  (t: Throwable) => {
+checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave"), t)
+  }
+}
+  )
+
+  // add operator
+  testStream(stream.dropDuplicates()
+.groupBy("value").count(), Update)(
+StartStream(checkpointLocation = checkpointDir.toString),
+AddData(inputData, 3),
+ExpectFailure[SparkRuntimeException] {
+  (t: Throwable) => {
+checkOpChangeError(Seq("dedupe"), Seq("stateStoreSave", "dedupe"), 
t)
+  }
+}
+  )
+
+  // remove operator

Review Comment:
   > Do we disallow stateful query to be stateless?
   
   We don't allow even before adding the operator check. Streaming will throw 
error with message as "state path not found".
   
   > E.g. could you simply test the removal of stateful operator with 
checkpointDir rather than spinning up another checkpoint?
   
   Done. Restarting a stateless query from a stateful query will now trigger 
error with message as:
   ```bash
   [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful 
operator name does not match with the operator in state metadata. This likely 
to happen when user adds/removes/changes stateful operator of existing 
streaming query.
   Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: 
dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional comma

Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]

2024-02-14 Thread via GitHub


MaxGekk commented on code in PR #45098:
URL: https://github.com/apache/spark/pull/45098#discussion_r1489927146


##
common/utils/src/main/resources/error/error-classes.json:
##
@@ -7767,6 +7767,76 @@
   "Single backslash is prohibited. It has special meaning as beginning of 
an escape sequence. To get the backslash character, pass a string with two 
backslashes as the delimiter."
 ]
   },
+  "_LEGACY_ERROR_TEMP_3249" : {
+"message" : [
+  "Failed to convert value  (class of }) with the type 
of  to JSON."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3250" : {
+"message" : [
+  "Failed to convert the JSON string '' to a field."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3251" : {
+"message" : [
+  "Failed to convert the JSON string '' to a data type."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3252" : {
+"message" : [
+  " does not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3253" : {
+"message" : [
+  " do(es) not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3254" : {
+"message" : [
+  " does not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3255" : {
+"message" : [
+  "Error parsing '' to interval, "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3256" : {
+"message" : [
+  "Unrecognized datetime pattern: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3257" : {
+"message" : [
+  "All week-based patterns are unsupported since Spark 3.0, detected: , 
Please use the SQL function EXTRACT instead"

Review Comment:
   I preserved punctuation of the original error intentionally:
   
https://github.com/apache/spark/pull/45098/files#diff-1ada8897c412e27c3f73c8f5449f62f1fdc805b979cc7cfa35fcf8ad031529bbL313
   to don't break existing tests. We should improve errors while assigning 
proper error class names, and writing tests for the errors. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47045][SQL] Replace `IllegalArgumentException` by `SparkIllegalArgumentException` in `sql/api` [spark]

2024-02-14 Thread via GitHub


xinrong-meng commented on code in PR #45098:
URL: https://github.com/apache/spark/pull/45098#discussion_r1489921713


##
common/utils/src/main/resources/error/error-classes.json:
##
@@ -7767,6 +7767,76 @@
   "Single backslash is prohibited. It has special meaning as beginning of 
an escape sequence. To get the backslash character, pass a string with two 
backslashes as the delimiter."
 ]
   },
+  "_LEGACY_ERROR_TEMP_3249" : {
+"message" : [
+  "Failed to convert value  (class of }) with the type 
of  to JSON."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3250" : {
+"message" : [
+  "Failed to convert the JSON string '' to a field."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3251" : {
+"message" : [
+  "Failed to convert the JSON string '' to a data type."
+]
+  },
+  "_LEGACY_ERROR_TEMP_3252" : {
+"message" : [
+  " does not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3253" : {
+"message" : [
+  " do(es) not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3254" : {
+"message" : [
+  " does not exist. Available: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3255" : {
+"message" : [
+  "Error parsing '' to interval, "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3256" : {
+"message" : [
+  "Unrecognized datetime pattern: "
+]
+  },
+  "_LEGACY_ERROR_TEMP_3257" : {
+"message" : [
+  "All week-based patterns are unsupported since Spark 3.0, detected: , 
Please use the SQL function EXTRACT instead"

Review Comment:
   nit: punctuation in `detected: , Please` could be improved, feel free to 
ignore though :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47049][BUILD] Ban non-shaded Hadoop dependencies [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun opened a new pull request, #45106:
URL: https://github.com/apache/spark/pull/45106

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]

2024-02-14 Thread via GitHub


xinrong-meng commented on PR #45073:
URL: https://github.com/apache/spark/pull/45073#issuecomment-1944388920

   Merged to master, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession [spark]

2024-02-14 Thread via GitHub


xinrong-meng closed pull request #45073: [SPARK-47014][PYTHON][CONNECT] 
Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession
URL: https://github.com/apache/spark/pull/45073


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1489907739


##
sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala:
##
@@ -46,5 +46,5 @@ private[sql] trait ValueState[S] extends Serializable {
   def update(newState: S): Unit
 
   /** Remove this state. */
-  def remove(): Unit
+  def clear(): Unit

Review Comment:
   Yes - correct, intentional change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1489903465


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ListStateImpl.scala:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, 
StateStoreErrors}
+import org.apache.spark.sql.streaming.ListState
+
+/**
+ * Provides concrete implementation for list of values associated with a state 
variable
+ * used in the streaming transformWithState operator.
+ *
+ * @param store - reference to the StateStore instance to be used for storing 
state
+ * @param stateName - name of logical state partition
+ * @tparam S - data type of object that will be stored in the list
+ */
+class ListStateImpl[S](store: StateStore,
+ stateName: String,
+ keyExprEnc: ExpressionEncoder[Any])
+  extends ListState[S] with Logging {
+
+   /** Whether state exists or not. */
+   override def exists(): Boolean = {
+ val encodedGroupingKey = 
StateTypesEncoderUtils.encodeGroupingKey(stateName, keyExprEnc)
+ val stateValue = store.get(encodedGroupingKey, stateName)
+ stateValue != null
+   }
+
+   /** Get the state value if it exists. If the state does not exist in state 
store, an
+* empty iterator is returned. */
+   override def get(): Iterator[S] = {
+ val encodedKey = StateTypesEncoderUtils.encodeGroupingKey(stateName, 
keyExprEnc)
+ val unsafeRowValuesIterator = store.valuesIterator(encodedKey, stateName)
+ new Iterator[S] {
+   override def hasNext: Boolean = {
+ unsafeRowValuesIterator.hasNext
+   }
+
+   override def next(): S = {
+ val valueUnsafeRow = unsafeRowValuesIterator.next()
+ StateTypesEncoderUtils.decodeValue(valueUnsafeRow)
+   }
+ }
+   }
+
+   /** Get the list value as an option if it exists and None otherwise. */
+   override def getOption(): Option[Iterator[S]] = {
+ Option(get())
+   }
+
+   /** Update the value of the list. */
+   override def put(newState: Array[S]): Unit = {
+ validateNewState(newState)
+
+ if (newState.isEmpty) {
+   this.clear()

Review Comment:
   Discussed with @sahnib offline and we think that its better to not allow 
passing empty list in this case to the `put` call. So, we will explicitly throw 
an exception in that case and for `get`, we will return empty iterator in case 
the value does not exist and a non-empty iterator otherwise. And we will also 
remove the `getOption` for this case since its use would be moot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46962][SS][PYTHON] Add interface for python streaming data source API and implement python worker to run python streaming data source [spark]

2024-02-14 Thread via GitHub


chaoqin-li1123 commented on code in PR #45023:
URL: https://github.com/apache/spark/pull/45023#discussion_r1489903039


##
python/pyspark/sql/datasource.py:
##
@@ -298,6 +320,104 @@ def read(self, partition: InputPartition) -> 
Iterator[Union[Tuple, Row]]:
 ...
 
 
+class DataSourceStreamReader(ABC):
+"""
+A base class for streaming data source readers. Data source stream readers 
are responsible
+for outputting data from a streaming data source.
+
+.. versionadded: 4.0.0
+"""
+
+def initialOffset(self) -> dict:
+"""
+Return the initial offset of the streaming data source.
+A new streaming query starts reading data from the initial offset.
+If Spark is restarting an existing query, it will restart from the 
check-pointed offset
+rather than the initial one.
+
+Returns
+---
+dict
+A dict whose key and values are str type.

Review Comment:
   The serialization is easy, but deserialization will be tricky. We don't have 
runtime type information to deserialize a plain json text back to python 
object.(because we don't even know what python type the json is serialized from 
unless we keep extra type information in the json)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45103:
URL: https://github.com/apache/spark/pull/45103#issuecomment-1944373538

   No problem. I just wanted to inform you. I fixed the field by removing my 
name. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]

2024-02-14 Thread via GitHub


William1104 commented on PR #45103:
URL: https://github.com/apache/spark/pull/45103#issuecomment-1944365291

   Hi @dongjoon-hyun 
   
   I am sorry about that. I created the JIRA via cloning. I don't know how to 
update the assignee back to myself. Would you mind to change it back to me, or 
let me know how I can update the assignee. 
   
   Best regards,
   William


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43259][SQL] Assign a name to the error class _LEGACY_ERROR_TEMP_2024 [spark]

2024-02-14 Thread via GitHub


MaxGekk commented on code in PR #45095:
URL: https://github.com/apache/spark/pull/45095#discussion_r1489876902


##
sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala:
##
@@ -1151,6 +1153,21 @@ class QueryExecutionErrorsSuite
   )
 )
   }
+
+  test("SPARK-43259: Uses unsupported custom encoder") {
+class CustomEncoder extends Encoder[Int] {
+  override def schema: StructType = StructType(Array.empty[StructField])
+  override def clsTag: ClassTag[Int] = ClassTag.Int
+}
+val e = intercept[SparkRuntimeException] {
+  encoderFor(new CustomEncoder)

Review Comment:
   Can you try to reproduce the issue using public Spark SQL API (sql or 
Java/Scala/Python api)? If it is not possible, we should convert the error to 
an internal one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47015][Collation] Disable partitioning on collated columns [spark]

2024-02-14 Thread via GitHub


MaxGekk commented on code in PR #45104:
URL: https://github.com/apache/spark/pull/45104#discussion_r1489873175


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -174,4 +174,36 @@ class CollationSuite extends QueryTest with 
SharedSparkSession {
   Row(expected))
 }
   }
+
+
+  test("disable partition on collated string column") {
+def createTable(partitionColumns: String*): Unit = {
+  val tableName = "test_partition_tbl"
+  withTable(tableName) {
+sql(
+  s"""
+ |CREATE TABLE $tableName
+ |(id INT, c1 STRING COLLATE 'UNICODE', c2 string)
+ |USING parquet
+ |PARTITIONED BY (${partitionColumns.mkString(",")})
+ |""".stripMargin)
+  }
+
+  // should work fine on non collated columns
+  createTable("id")
+  createTable("c2")
+  createTable("id", "c2")
+
+  Seq(Seq("c1"), Seq("c1", "id"), Seq("c1", "c2")).foreach { 
partitionColumns =>
+checkError(
+  exception = intercept[AnalysisException] {
+createTable(partitionColumns: _*)
+  },
+  errorClass = "_LEGACY_ERROR_TEMP_1153",

Review Comment:
   As you are here, could you assign proper name for the error class and 
improve error message format (if it is needed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun closed pull request #45096: [SPARK-47038][BUILD] Remove shaded 
`protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly`
URL: https://github.com/apache/spark/pull/45096


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


viirya commented on code in PR #45096:
URL: https://github.com/apache/spark/pull/45096#discussion_r1489871042


##
connector/kinesis-asl-assembly/pom.xml:
##
@@ -59,16 +59,6 @@
   commons-lang
   provided
 
-
-  com.google.protobuf
-  protobuf-java
-  2.6.1
-  
-

Review Comment:
   Oh, I see. It was overriding the protobuf version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45096:
URL: https://github.com/apache/spark/pull/45096#issuecomment-1944333969

   Thank you so much, @viirya !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47043][BUILD] add `jackson-core` and `jackson-annotations` dependencies to module `spark-common-utils` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45103:
URL: https://github.com/apache/spark/pull/45103#issuecomment-1944331715

   BTW, @William1104 , please don't borrow someone-else name like this.
   
   ![Screenshot 2024-02-14 at 09 59 
56](https://github.com/apache/spark/assets/9700541/9dfafa5c-6335-4450-998e-6832c66f8b6b)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45096:
URL: https://github.com/apache/spark/pull/45096#discussion_r1489867281


##
connector/kinesis-asl-assembly/pom.xml:
##
@@ -59,16 +59,6 @@
   commons-lang
   provided
 
-
-  com.google.protobuf
-  protobuf-java
-  2.6.1
-  
-

Review Comment:
   Yes, it's `provided` dependency still as I described in the PR description.
   ![Screenshot 2024-02-14 at 09 58 
01](https://github.com/apache/spark/assets/9700541/b7e829a5-f1cc-47d4-85b4-19cedcf0e46b)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on code in PR #45096:
URL: https://github.com/apache/spark/pull/45096#discussion_r1489867281


##
connector/kinesis-asl-assembly/pom.xml:
##
@@ -59,16 +59,6 @@
   commons-lang
   provided
 
-
-  com.google.protobuf
-  protobuf-java
-  2.6.1
-  
-

Review Comment:
   Yes, it's `provided` dependency now as I described in the PR description.
   ![Screenshot 2024-02-14 at 09 58 
01](https://github.com/apache/spark/assets/9700541/b7e829a5-f1cc-47d4-85b4-19cedcf0e46b)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1489865483


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##
@@ -78,7 +78,7 @@ class StatePartitionReader(
 
 StateStoreProvider.createAndInit(
   stateStoreProviderId, keySchema, valueSchema, numColsPrefixKey,
-  useColumnFamilies = false, storeConf, hadoopConf.value)
+  useColumnFamilies = false, storeConf, hadoopConf.value, 
useMultipleValuesPerKey = false)

Review Comment:
   Filed the ticket here - https://issues.apache.org/jira/browse/SPARK-47047



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


viirya commented on code in PR #45096:
URL: https://github.com/apache/spark/pull/45096#discussion_r1489864752


##
connector/kinesis-asl-assembly/pom.xml:
##
@@ -59,16 +59,6 @@
   commons-lang
   provided
 
-
-  com.google.protobuf
-  protobuf-java
-  2.6.1
-  
-

Review Comment:
   Before SPARK-14421, it is a provided dependency. Don't we need to keep it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SS][SPARK-46928] Add support for ListState in Arbitrary State API v2. [spark]

2024-02-14 Thread via GitHub


anishshri-db commented on code in PR #44961:
URL: https://github.com/apache/spark/pull/44961#discussion_r1489863110


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##
@@ -65,6 +65,43 @@ private[sql] class RocksDBStateStoreProvider
   value
 }
 
+override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
+  verify(key != null, "Key cannot be null")
+  verify(encoder.supportsMultipleValuesPerKey, "valuesIterator requires a 
encoder " +
+  "that supports multiple values for a single key.")
+  val valueIterator = 
encoder.decodeValues(rocksDB.get(encoder.encodeKey(key), colFamilyName))
+
+  if (!isValidated && valueIterator.nonEmpty) {

Review Comment:
   @HeartSaVioR - is it ok to remove this check everywhere ? I can create the 
change in a separate PR. Don't think this is giving us too much currently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


viirya commented on PR #45096:
URL: https://github.com/apache/spark/pull/45096#issuecomment-1944317618

   Looking into this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45096:
URL: https://github.com/apache/spark/pull/45096#issuecomment-1944313907

   Could you review this, please, @viirya ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47038][BUILD] Remove shaded `protobuf-java` 2.6.1 dependency from `kinesis-asl-assembly` [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45096:
URL: https://github.com/apache/spark/pull/45096#issuecomment-1944313704

   All tests passed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47036][SS] Cleanup RocksDB file tracking for previously uploaded files if files were deleted from local directory [spark]

2024-02-14 Thread via GitHub


sahnib commented on code in PR #45092:
URL: https://github.com/apache/spark/pull/45092#discussion_r1489848282


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -1863,6 +1864,91 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 }
   }
 
+  test("ensure local files deleted on filesystem" +
+" are cleaned from dfs file mapping") {
+def getSSTFiles(dir: File): Set[File] = {
+  val sstFiles = new mutable.HashSet[File]()
+  dir.listFiles().foreach { f =>
+if (f.isDirectory) {
+  sstFiles ++= getSSTFiles(f)
+} else {
+  if (f.getName.endsWith(".sst")) {
+sstFiles.add(f)
+  }
+}
+  }
+  sstFiles.toSet
+}
+
+def filterAndDeleteSSTFiles(dir: File, filesToKeep: Set[File]): Unit = {
+  dir.listFiles().foreach { f =>
+if (f.isDirectory) {
+  filterAndDeleteSSTFiles(f, filesToKeep)
+} else {
+  if (!filesToKeep.contains(f) && f.getName.endsWith(".sst")) {
+logInfo(s"deleting ${f.getAbsolutePath} from local directory")
+f.delete()
+  }
+}
+  }
+}
+
+withTempDir { dir =>
+  withTempDir { localDir =>
+val sqlConf = new SQLConf()
+val dbConf = RocksDBConf(StateStoreConf(sqlConf))
+logInfo(s"config set to ${dbConf.compactOnCommit}")
+val hadoopConf = new Configuration()
+val remoteDir = dir.getCanonicalPath
+withDB(remoteDir = remoteDir,
+  conf = dbConf,
+  hadoopConf = hadoopConf,
+  localDir = localDir) { db =>
+  db.load(0)
+  db.put("a", "1")
+  db.put("b", "1")
+  db.commit()
+  // upload snapshots (with changelog checkpointing)

Review Comment:
   The maintenance operation creates a snapshot only if changelog checkpointing 
is enabled. However, I agree that the comment is confusing though because the 
test runs both with/without changelog checkpointing. Further, state store does 
create snapshots on commit based on minDeltasForSnapshots (with changelog 
checkpointing enabled). I have removed the comment as I think it adds more 
confusion than helps. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun closed pull request #45097: [SPARK-47039][TESTS] Add a checkstyle 
rule to ban `commons-lang` in Java code
URL: https://github.com/apache/spark/pull/45097


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45097:
URL: https://github.com/apache/spark/pull/45097#issuecomment-1944303256

   Thank you so much, @huaxingao ! Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47042][BUILD] add missing explicit dependency 'commons-lang3' to the module 'spark-common-utils' [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45101:
URL: https://github.com/apache/spark/pull/45101#issuecomment-1944302594

   I agree with you at this point.
   >  By explicitly declaring the dependency, we can avoid any unexpected 
missing dependencies that might occur when upgrading 'commons-text'.
   
   For the following question, since we cannot enumerate all transitive 
dependencies always, Apache Spark established a way to depend on an explicit 
manifest file (of the final Apache Spark distribution) and CI checking.
   - https://github.com/apache/spark/blob/master/dev/test-dependencies.sh
   - 
https://github.com/apache/spark/blob/master/dev/deps/spark-deps-hadoop-3-hive-2.3
   
   > . If it is the established practice within the Spark project to address 
dependency issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]

2024-02-14 Thread via GitHub


huaxingao commented on PR #45097:
URL: https://github.com/apache/spark/pull/45097#issuecomment-1944296112

   LGTM. Thanks for the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47009][Collation] Enable create table support for collation [spark]

2024-02-14 Thread via GitHub


stefankandic opened a new pull request, #45105:
URL: https://github.com/apache/spark/pull/45105

   
   
   ### What changes were proposed in this pull request?
   
   Adding support for create table with collated columns using parquet
   
   ### Why are the changes needed?
   
   In order to support basic DDL operations for collations
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, users are now able to create tables with collated columns
   
   ### How was this patch tested?
   
   With UTs
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47042][BUILD] add missing explicit dependency 'commons-lang3' to the module 'spark-common-utils' [spark]

2024-02-14 Thread via GitHub


William1104 commented on PR #45101:
URL: https://github.com/apache/spark/pull/45101#issuecomment-1944292205

   Hi @dongjoon-hyun 
   
   Thank you for taking the time to review my pull request. I appreciate your 
feedback and would like to address the points you raised.
   
   Regarding the declaration of explicit dependencies, I believe it is a common 
practice to declare dependencies explicitly when a module contains a class that 
directly depends on a specific library. This practice ensures that the 
dependencies are clearly defined and helps mitigate potential issues that may 
arise in the future. While a library like 'commons-text' may currently have 
'commons-lang3' as a transitive dependency, there is no guarantee that this 
relationship will remain unchanged in future releases. By explicitly declaring 
the dependency, we can avoid any unexpected missing dependencies that might 
occur when upgrading 'commons-text'.
   
   However, I understand that different projects may have different approaches 
to handling dependency issues. If it is the established practice within the 
Spark project to address dependency issues only when they block enhancements or 
pose critical problems, I am willing to follow that practice and close this 
pull request accordingly.
   
   Thank you again for your review, and I look forward to any further guidance 
you may provide.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45097:
URL: https://github.com/apache/spark/pull/45097#issuecomment-1944262290

   Could you review this PR, please, @huaxingao ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47039][TESTS] Add a checkstyle rule to ban `commons-lang` in Java code [spark]

2024-02-14 Thread via GitHub


dongjoon-hyun commented on PR #45097:
URL: https://github.com/apache/spark/pull/45097#issuecomment-1944261211

   All tests passed.
   
   ![Screenshot 2024-02-14 at 09 12 
56](https://github.com/apache/spark/assets/9700541/23f0f2ea-aeb1-4c34-907a-3415fdc002d3)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >