[GitHub] [spark] ulysses-you commented on pull request #40262: [SPARK-42651][SQL] Optimize global sort to driver sort

2023-03-02 Thread via GitHub


ulysses-you commented on PR #40262:
URL: https://github.com/apache/spark/pull/40262#issuecomment-1453105982

   @zhengruifeng thank you for your thought.

   The original idea of driver sort is to avoid one shuffle. Requires 
SinglePartition seems does not help since it still requires a shuffle.
   
   Besides, finally, the result would go to driver, i.e. `df.sort.collect` 
(It's the reason I match `ReturnAnswer`), so it should be fine to do at driver. 
Plus, do sort at driver is not the first code place except driver sort. e.g., 
the merge function of rdd.takeOrdered. It should be safe if the size of plan is 
small enough.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mskapilks commented on pull request #40266: [SPARK-42660] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)

2023-03-02 Thread via GitHub


mskapilks commented on PR #40266:
URL: https://github.com/apache/spark/pull/40266#issuecomment-1453097749

   TPCH q21 plan change
   |Before|After|
   |--||
   |![Web capture_3-3-2023_125513_ms web azuresynapse 
net_crop](https://user-images.githubusercontent.com/106726387/222658668-830d9ddb-2cc2-4013-8dbc-29967d957954.jpeg)|![Web
 capture_3-3-2023_12324_ms web azuresynapse 
net_crop](https://user-images.githubusercontent.com/106726387/222658801-f917a4cd-f825-4567-aca8-cb596dd64e51.jpeg)|


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40262: [SPARK-42651][SQL] Optimize global sort to driver sort

2023-03-02 Thread via GitHub


zhengruifeng commented on PR #40262:
URL: https://github.com/apache/spark/pull/40262#issuecomment-1453092378

   just a idea, what about: making  `SortExec` requires `SinglePartition` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40259: [SPARK-42609][CONNECT][TESTS] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


zhengruifeng commented on PR #40259:
URL: https://github.com/apache/spark/pull/40259#issuecomment-1453086077

   merged into master/3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1453086009

   > @beliefer you can create a test in `PlanGenerationTestSuite`. That will at 
least validate the proto message we are generating, and it will validate that 
plan you are producing yields a valid plan in `ProtoToPlanTestSuite`.
   
   OK. I added the test cases. But throws
   ```
   org.h2.jdbc.JdbcSQLSyntaxErrorException: Schema "TEST" not found; SQL 
statement:
   SELECT * FROM TEST.TIMETYPES WHERE 1=0 [90079-214]
   ```
when ProtoToParsedPlanTestSuite validating the golden file. The built-in H2 
running in server side and we can't start H2 database at connect API.
   @hvanhovell Could you tell me how to fix or avoid it ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #40259: [SPARK-42609][CONNECT][TESTS] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


zhengruifeng closed pull request #40259: [SPARK-42609][CONNECT][TESTS] Add 
tests for grouping() and grouping_id() functions
URL: https://github.com/apache/spark/pull/40259


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mskapilks opened a new pull request, #40266: [SPARK-42660] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)

2023-03-02 Thread via GitHub


mskapilks opened a new pull request, #40266:
URL: https://github.com/apache/spark/pull/40266

   
   ### What changes were proposed in this pull request?
   We should run `InferFiltersFromConstraints` again after running 
`RewritePredicateSubquery` rule. `RewritePredicateSubquery `rewrite IN and 
EXISTS queries to LEFT SEMI/LEFT ANTI joins. But we don't infer filters for 
these newly generated joins. We noticed in TPCH 1TB q21 by inferring filter for 
these new joins, one `lineitem` table scan can be reduced as `ReusedExchange` 
got introduce. Previously due to mismatch in filter predicates reuse was not 
happening.
   
   
   
   
   ### Why are the changes needed?
   Can improve query performance.
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   PlanStability test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer opened a new pull request, #40265: [SPARK-42556][CONNECT] Dataset.colregex should link a plan_id when it only matches a single column.

2023-03-02 Thread via GitHub


beliefer opened a new pull request, #40265:
URL: https://github.com/apache/spark/pull/40265

   ### What changes were proposed in this pull request?
   When colregex returns a single column it should link the plans plan_id. For 
reference here is the non-connect Dataset code that does this:
   
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1512
   This also needs to be fixed for the Python client.
   
   
   ### Why are the changes needed?
   Let the UnresolvedAttribute link plan_id if it is exist.
   
   
   ### Does this PR introduce _any_ user-facing change?
   'No'.
   New feature.
   
   
   ### How was this patch tested?
   New test cases.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chenhao-db commented on pull request #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression

2023-03-02 Thread via GitHub


chenhao-db commented on PR #40264:
URL: https://github.com/apache/spark/pull/40264#issuecomment-1453062944

   @MaxGekk Please take a look, thanks for reviewing!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chenhao-db opened a new pull request, #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression

2023-03-02 Thread via GitHub


chenhao-db opened a new pull request, #40264:
URL: https://github.com/apache/spark/pull/40264

   This is a backport of #40237.
   
   ### What changes were proposed in this pull request?
   This PR fixed the counter-intuitive behaviors of the `TimestampAdd` 
expression mentioned in https://issues.apache.org/jira/browse/SPARK-42635. See 
the following *user-facing* changes for details.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. This PR fixes the three problems mentioned in SPARK-42635:
   
   1. When the time is close to daylight saving time transition, the result may 
be discontinuous and not monotonic.
   2. Adding month, quarter, and year silently ignores `Int` overflow during 
unit conversion.
   3. Adding sub-month units (week, day, hour, minute, second, millisecond, 
microsecond)silently ignores `Long` overflow during unit conversion.
   
   Some examples of the result changes:
   
   Old results:
   
   ```
   // In America/Los_Angeles timezone:
   timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 (this is 
correct, put it here for comparison)
   timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
   timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
   timestampadd(SECOND, 86400 - 1, 2011-03-12 03:00:00) = 2011-03-13 03:59:59
   timestampadd(SECOND, 86400, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
   // In UTC timezone:
   timestampadd(quarter, 1431655764, 1970-01-01 00:00:00) = 1969-09-01 00:00:00
   timestampadd(day, 106751992, 1970-01-01 00:00:00) = -290308-12-22 
15:58:10.448384
   ```
   
   New results:
   
   ```
   // In America/Los_Angeles timezone:
   timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
   timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00
   timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00
   timestampadd(SECOND, 86400 - 1, 2011-03-12 03:00:00) = 2011-03-13 03:59:59
   timestampadd(SECOND, 86400, 2011-03-12 03:00:00) = 2011-03-13 04:00:00
   // In UTC timezone:
   timestampadd(quarter, 1431655764, 1970-01-01 00:00:00) = throw overflow 
exception
   timestampadd(day, 106751992, 1970-01-01 00:00:00) = throw overflow exception
   ```
   
   
   ### How was this patch tested?
   
   Pass existing tests and some new tests.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40237:
URL: https://github.com/apache/spark/pull/40237#issuecomment-1453059745

   @chenhao-db Congratulations with your first contribution to Apache Spark!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk closed pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd 
expression.
URL: https://github.com/apache/spark/pull/40237


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40237:
URL: https://github.com/apache/spark/pull/40237#issuecomment-1453051043

   @chenhao-db Your changes cause some conflicts in `branch-3.3`. Please, open 
a separate PR with a backport to Spark 3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


MaxGekk commented on PR #40237:
URL: https://github.com/apache/spark/pull/40237#issuecomment-1453050502

   +1, LGTM. Merging to master/3.4.
   Thank you, @chenhao-db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] the8thC commented on pull request #40236: [SPARK-38735][SQL][Tests] Add tests for the error class: INTERNAL_ERROR

2023-03-02 Thread via GitHub


the8thC commented on PR #40236:
URL: https://github.com/apache/spark/pull/40236#issuecomment-1453042684

   @MaxGekk Can you take a look at this please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chenhao-db commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.

2023-03-02 Thread via GitHub


chenhao-db commented on PR #40237:
URL: https://github.com/apache/spark/pull/40237#issuecomment-1453041214

   @MaxGekk Thanks for reviewing! I guess I need to ask you to merge this PR 
right? Also, it should be ported into older branches with `TimestampAdd` 
(3.3.0, 3.3.1, 3.3.2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA 
REPL shell script for JVM client
URL: https://github.com/apache/spark/pull/40257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ritikam2 commented on pull request #40116: [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect

2023-03-02 Thread via GitHub


ritikam2 commented on PR #40116:
URL: https://github.com/apache/spark/pull/40116#issuecomment-1453013580

   @cloud-fan  always using unresolvedAlias seems to be causing the sql-other 
module to fail. Will be reverting to the original fix of creating 
unresolvedAlias only for "*" or distinct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on PR #40257:
URL: https://github.com/apache/spark/pull/40257#issuecomment-1453013522

   Merged to master and branch-3.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


HyukjinKwon commented on PR #40238:
URL: https://github.com/apache/spark/pull/40238#issuecomment-1453013336

   test: https://github.com/hvanhovell/spark/actions/runs/4318951397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng opened a new pull request, #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-02 Thread via GitHub


zhengruifeng opened a new pull request, #40263:
URL: https://github.com/apache/spark/pull/40263

   ### What changes were proposed in this pull request?
   Reimplement `FPGrowthModel.transform` with dataframe operations
   
   
   ### Why are the changes needed?
   delay the `collect()` of model dataframe `associationRules`
   
   
   ### Does this PR introduce _any_ user-facing change?
   no
   
   
   ### How was this patch tested?
   existing UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1124028359


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")

Review Comment:
   No, we should mark this as 3.5.0 instead ... slipped through the review (it 
was created way before 3.4 was cut).
   
   Thanks for catching this @ulysses-you !
   Can you create a follow up to change this to 3.5 @ivoson ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1124028359


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")

Review Comment:
   No, we should mark this as 3.5.0 instead ... slipped through the review (it 
was created way before 3.4 was cut).
   Can you create a follow up to change this to 3.5 @ivoson ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you opened a new pull request, #40262: [SPARK-42651][SQL] Optimize global sort to driver sort

2023-03-02 Thread via GitHub


ulysses-you opened a new pull request, #40262:
URL: https://github.com/apache/spark/pull/40262

   
   
   ### What changes were proposed in this pull request?
   
   This pr adds a new physical plan `DriverSortExec`. It represents a special 
case of global `SortExec`  which is the root node. Then we can save one shuffle.
   
   Add a new config `spark.sql.execution.driverSortThreshold` to control if 
it's safe to do sort at driver side. We should make sure the max rows of plan 
is small enough to avoid oom and preformance regression.
   
   This optimization should work fine since It gets benefits from AQE framework 
that the max rows of logical plan is accurate and it can be propagated cross 
most of nodes.
   
   ### Why are the changes needed?
   
   Improve performance
   
   ### Does this PR introduce _any_ user-facing change?
   
   no
   
   ### How was this patch tested?
   
   add test and do some benchmark
   
   Sort 50, 000 rows:
   ```
   OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1033-azure
   Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz
   DriverSort:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
   

   Sort   1158   1355   
  210  0.0   23169.3   1.0X
   DriverSort  636680   
   55  0.1   12715.2   1.8X
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123993741


##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   This scripts ensures that the jars are built up-to-date. So that we can run 
the client/server to test out the new code quickly. This could be useful for 
example to test/debug udf etc. This is also why we keep it here close to the 
client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123993315


##
connector/connect/bin/spark-connect-scala-client:
##
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `connector/connect/bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"

Review Comment:
   sorry I misread the path.



##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   sorry I misread the path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


zhengruifeng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123992438


##
python/pyspark/sql/connect/_typing.py:
##
@@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol):
 deterministic: bool
 
 @property
-def returnType(self) -> DataType:
+def returnType(self) -> DataTypeOrString:

Review Comment:
   Yeah, but I am still confused about it:
   
   that the old implementation
   `PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema` works.
   
   I also tried 
   ```
   session = PySparkSession.builder.getOrCreate()
   parsed = session.client._analyze(  # type: ignore[attr-defined]
   method="ddl_parse", ddl_string=data_type
   ).parsed
   ```
   and at least the tests passed.
   
   But if I try
   ```
parsed = PySparkSession.builder.getOrCreate().client._analyze(  # type: 
ignore[attr-defined]
   method="ddl_parse", ddl_string=data_type
   ).parsed
   ```
   the tests always fail with `ValueError: Cannot invoke RPC on closed channel!`
   
   Maybe we will have to add a pure python ddl parser, i don't know



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123990449


##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   Should probably put it in `dev` directory if this is for dev ..  BTW, we 
have added `connect-` prefix in connect related scripts cc @grundprinzip FYI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123990665


##
connector/connect/bin/spark-connect-scala-client:
##
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `connector/connect/bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"

Review Comment:
   ditto. If this is for dev, I would put this in `dev` directory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123990449


##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   Should probably put it in `dev` directory if this is for dev .. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123990365


##
connector/connect/bin/spark-connect-scala-client:
##
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `connector/connect/bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"

Review Comment:
   why? this is meant for development.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123990101


##
connector/connect/bin/spark-connect-scala-client:
##
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `connector/connect/bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"

Review Comment:
   Can we make `spark-shell` support this, with `--remote` and `--jars` or 
`--packages` option? That would be ideal. Again I don't mind doing this 
separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123989785


##
connector/connect/bin/spark-connect-scala-client:
##
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `connector/connect/bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"

Review Comment:
   It's actually a bit odd to have build command here .. it won't work once we 
release this out. I don't mind addressing this separately but we would have to 
fix it before the release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123989425


##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   This is more aimed at development. There are quite a few differences:
   - This builds the code if we need it
   - This does not return after starting the process. It actually logs to the 
console which makes debugging easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


amaliujia commented on PR #40256:
URL: https://github.com/apache/spark/pull/40256#issuecomment-1452899534

   Overall looks good. Thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123987429


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompleted(): Unit = {
+promise.success(summaries.toSeq)
+  }
+}
+val stream = stub.addArtifacts(responseHandler)
+val currentBatch = mutable.Buffer.empty[Artifact]
+var currentBatchSize = 0L
+
+def addToBatch(dep: Artifact, size: Long): Unit = {
+  currentBatch += dep
+  currentBatchSize += size
+}
+
+def writeBatch(): Unit = {
+  addBatchedArtifacts(currentBatch.toSeq, stream)
+  currentBatch.clear()
+  currentBatchSize = 0
+}
+
+artifacts.iterator.foreach { artifact =>
+  val data = artifact.storage
+  val size = data.size

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123987387


##
connector/connect/bin/spark-connect:
##
@@ -0,0 +1,32 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Go to the Spark project root directory
+FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)"
+cd "$FWDIR"
+export SPARK_HOME=$FWDIR
+
+# Build the jars needed for spark submit and spark connect
+build/sbt package

Review Comment:
   Hm, I guess this can be replaced by `./sbin/start-connect-server.sh --jars 
`ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123986940


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompleted(): Unit = {
+promise.success(summaries.toSeq)
+  }
+}
+val stream = stub.addArtifacts(responseHandler)
+val currentBatch = mutable.Buffer.empty[Artifact]
+var currentBatchSize = 0L
+
+def addToBatch(dep: Artifact, size: Long): Unit = {
+  currentBatch += dep
+  currentBatchSize += size
+}
+
+def writeBatch(): Unit = {
+  addBatchedArtifacts(currentBatch.toSeq, stream)
+  currentBatch.clear()
+  currentBatchSize = 0
+}
+
+artifacts.iterator.foreach { artifact =>
+  val data = artifact.storage
+  val size = data.size

[GitHub] [spark] wangyum commented on a diff in pull request #40190: [SPARK-42597][SQL] UnwrapCastInBinaryComparison support unwrap timestamp type

2023-03-02 Thread via GitHub


wangyum commented on code in PR #40190:
URL: https://github.com/apache/spark/pull/40190#discussion_r1123984775


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala:
##
@@ -293,6 +299,34 @@ object UnwrapCastInBinaryComparison extends 
Rule[LogicalPlan] {
 }
   }
 
+  /**
+   * Move the cast to the literal side, because we can only get the minimum 
value of timestamp,
+   * so some BinaryComparison needs to be changed,
+   * such as CAST(ts AS date) > DATE '2023-01-01' ===> ts >= TIMESTAMP 
'2023-01-02 00:00:00'

Review Comment:
   It is because it is easy to get the start of the day timestamp, not easy to 
get the end of the day timestamp.
   ```sql
   spark-sql> select cast(date'2023-01-01' as timestamp);
   2023-01-01 00:00:00
   ```
   vs
   ```sql
   spark-sql> select cast(concat(cast(date'2023-01-01' as string), ' 
23:59:59.99') as timestamp);
   2023-01-01 23:59:59.99
   ```
   
   
   It will truncate if casting from long type to timestamp type:
   ```sql
   spark-sql> select cast(cast(cast(date'2023-01-01' as timestamp) as long) + 
(24* 60 * 60 - 1) as timestamp);
   2023-01-01 23:59:59
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123979423


##
python/pyspark/sql/connect/_typing.py:
##
@@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol):
 deterministic: bool
 
 @property
-def returnType(self) -> DataType:
+def returnType(self) -> DataTypeOrString:

Review Comment:
   Yes, that's true.
   However, the client is not available here and we don't have a proper way to 
parse the string here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ulysses-you commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1123978338


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")

Review Comment:
   According to the commit history, it seems this pr lands at master 
branch(3.5.0). Should this backport to 3.4.0 or update the version ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


xinrong-meng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123976155


##
python/pyspark/sql/connect/_typing.py:
##
@@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol):
 deterministic: bool
 
 @property
-def returnType(self) -> DataType:
+def returnType(self) -> DataTypeOrString:

Review Comment:
   Oh yes, I commented separately below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


xinrong-meng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740


##
python/pyspark/sql/connect/udf.py:
##
@@ -99,9 +97,7 @@ def __init__(
 )
 
 self.func = func
-self._returnType = (
-parse_data_type(returnType) if isinstance(returnType, str) else 
returnType
-)
+self._returnType = returnType

Review Comment:
   In the vanilla PySpark, a `returnType` property is defined as 
https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, 
which returns a DataType always.
   
   In the existing Connect's code, `wrapper.returnType = self._returnType` is 
utilized instead.
   
   I am afraid this line of change may break that property parity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


HyukjinKwon commented on PR #40257:
URL: https://github.com/apache/spark/pull/40257#issuecomment-1452877870

   Seems like tests did not pass, and it fails in the master branch 
(https://github.com/apache/spark/actions/runs/4319488463/jobs/7538760733).
   
   Let me quickly revert this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


xinrong-meng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740


##
python/pyspark/sql/connect/udf.py:
##
@@ -99,9 +97,7 @@ def __init__(
 )
 
 self.func = func
-self._returnType = (
-parse_data_type(returnType) if isinstance(returnType, str) else 
returnType
-)
+self._returnType = returnType

Review Comment:
   In the vanilla PySpark, a `returnType` property is defined as 
https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, 
which returns a DataType always.
   
   In the existing Connect's code, `wrapper.returnType = self._returnType` is 
utilized instead.
   
   I am afraid that line of change may break that property parity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


xinrong-meng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740


##
python/pyspark/sql/connect/udf.py:
##
@@ -99,9 +97,7 @@ def __init__(
 )
 
 self.func = func
-self._returnType = (
-parse_data_type(returnType) if isinstance(returnType, str) else 
returnType
-)
+self._returnType = returnType

Review Comment:
   In the vanilla PySpark, a `returnType` property is defined as 
https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, 
which returns DataType always.
   
   In the existing Connect's code, `wrapper.returnType = self._returnType` is 
utilized instead.
   
   I am afraid that line of change may break that property parity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


zhengruifeng commented on code in PR #40260:
URL: https://github.com/apache/spark/pull/40260#discussion_r1123974022


##
python/pyspark/sql/connect/_typing.py:
##
@@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol):
 deterministic: bool
 
 @property
-def returnType(self) -> DataType:
+def returnType(self) -> DataTypeOrString:

Review Comment:
   do this introduce a behavior change?
   
   
https://github.com/apache/spark/blob/6ff760d483124b121d79c3a2d5fdc3ee3f27dd00/python/pyspark/sql/_typing.pyi#L70-L77
   
   It seems that the PySpark UDF's `returnType` is always a `DataType`
   
   cc @xinrong-meng 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng commented on pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive

2023-03-02 Thread via GitHub


zhengruifeng commented on PR #40261:
URL: https://github.com/apache/spark/pull/40261#issuecomment-1452864911

   thank you, merged into master/3.4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhengruifeng closed pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive

2023-03-02 Thread via GitHub


zhengruifeng closed pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix 
SparkConnectAnalyzeHandler to use withActive
URL: https://github.com/apache/spark/pull/40261


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompleted(): Unit = {
+promise.success(summaries.toSeq)
+  }
+}
+val stream = stub.addArtifacts(responseHandler)
+val currentBatch = mutable.Buffer.empty[Artifact]
+var currentBatchSize = 0L
+
+def addToBatch(dep: Artifact, size: Long): Unit = {
+  currentBatch += dep
+  currentBatchSize += size
+}
+
+def writeBatch(): Unit = {
+  addBatchedArtifacts(currentBatch.toSeq, stream)
+  currentBatch.clear()
+  currentBatchSize = 0
+}
+
+artifacts.iterator.foreach { artifact =>
+  val data = artifact.storage
+  val size = data.size

[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompleted(): Unit = {
+promise.success(summaries.toSeq)
+  }
+}
+val stream = stub.addArtifacts(responseHandler)
+val currentBatch = mutable.Buffer.empty[Artifact]
+var currentBatchSize = 0L
+
+def addToBatch(dep: Artifact, size: Long): Unit = {
+  currentBatch += dep
+  currentBatchSize += size
+}
+
+def writeBatch(): Unit = {
+  addBatchedArtifacts(currentBatch.toSeq, stream)
+  currentBatch.clear()
+  currentBatchSize = 0
+}
+
+artifacts.iterator.foreach { artifact =>
+  val data = artifact.storage
+  val size = data.size

[GitHub] [spark] LuciferYang commented on pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`

2023-03-02 Thread via GitHub


LuciferYang commented on PR #40213:
URL: https://github.com/apache/spark/pull/40213#issuecomment-1452840888

   Thanks @hvanhovell @amaliujia @zhenlineo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL 
shell script for JVM client
URL: https://github.com/apache/spark/pull/40257


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on PR #40257:
URL: https://github.com/apache/spark/pull/40257#issuecomment-1452827799

   Merging this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123930392


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   Good point. IIRC, there was a decision not to implement it in Python.
   cc @HyukjinKwon @zhengruifeng 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123926302


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   @ueshin why not implement a parser on the python side? That shouldn't be too 
hard, and it saves you from doing a lot of expensive rpcs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen commented on pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


srowen commented on PR #40220:
URL: https://github.com/apache/spark/pull/40220#issuecomment-1452779722

   Merged to master/3.4/3.3, for consistency with 
https://issues.apache.org/jira/browse/SPARK-40376


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types

2023-03-02 Thread via GitHub


srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy 
deprecated and removed types
URL: https://github.com/apache/spark/pull/40220


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123923286


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   I think this change is fine. I'd rather like to change all places to accept 
both string and `DataType` and the choice is up to the client developers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123920584


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {

Review Comment:
   In some cases, we need to parse it beforehand anyway, for the case of 
#40240, we can't create converter from the local python object to Arrow table 
without the schema as `DataType` object.
   
   Another example is UDFs. It needs to pickle the function, and its return 
type as `DataType` object. We can't generate Python `DataType` object and 
pickle it in the `command` field in server side anymore. So we need to parse 
the DDL string beforehand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


ivoson commented on PR #39459:
URL: https://github.com/apache/spark/pull/39459#issuecomment-1452774796

   Thanks for the review and help on this. @mridulm @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123913129


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""

Review Comment:
   This feature request is a bit hard :D I cannot figure out a way to pass the 
args to the program rather than scala repl. 
   
   I think it relates '-i'. We need '-i' to keep the repl running, then the 
repl thinks all args are to the repl rather than the scala script.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123911014


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"
+
+CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
+SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" 
| grep jar | tail -n1)"
+
+INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc
+"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" 
-i $INIT_SCRIPT

Review Comment:
   Now it will find the version set in the top pom file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123910664


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   The user agent if not set, defaults to "_SPARK_CONNECT_SCALA". Do you want 
another name for the script client?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader

2023-03-02 Thread via GitHub


beliefer commented on PR #40252:
URL: https://github.com/apache/spark/pull/40252#issuecomment-1452763000

   > I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ?
   
   The built-in H2 running in server side and we can't start H2 database at 
connect API.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40210:
URL: https://github.com/apache/spark/pull/40210#discussion_r1123898851


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, 
FormattedMode, SimpleMode}
+
+private[connect] class SparkConnectAnalyzeHandler(
+responseObserver: StreamObserver[proto.AnalyzePlanResponse])
+extends Logging {
+
+  def handle(request: proto.AnalyzePlanRequest): Unit = {
+val session =
+  SparkConnectService
+.getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getClientId)
+.session
+
+val response = process(request, session)
+responseObserver.onNext(response)
+responseObserver.onCompleted()

Review Comment:
   #40261



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin opened a new pull request, #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive

2023-03-02 Thread via GitHub


ueshin opened a new pull request, #40261:
URL: https://github.com/apache/spark/pull/40261

   ### What changes were proposed in this pull request?
   
   Fix `SparkConnectAnalyzeHandler` to use `withActive`.
   
   ### Why are the changes needed?
   
   Similar to #40165, `SQLConf.get` is necessary when transforming the proto to 
plans.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123897616


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+service = new DummySparkConnectService()
+server = InProcessServerBuilder
+  .forName(getClass.getName)
+  .addService(service)
+  .build()
+server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+channel = 
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+artifactManager = new 
ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+startDummyServer()
+createArtifactManager()
+client = null
+  }
+
+  override def afterEach(): Unit = {
+if (server != null) {
+  server.shutdownNow()
+  assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to 
shutdown")
+}
+
+if (channel != null) {
+  channel.shutdownNow()
+}
+
+if (client != null) {
+  client.shutdown()
+}
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = 
baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is 
equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+  artifactChunk: AddArtifactsRequest.ArtifactChunk,
+  localPath: Path): Unit = {
+val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+val localData = ByteString.readFrom(in)
+assert(artifactChunk.getData == localData)
+assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+test(s"Single Chunk Artifact - $path") {
+  val artifactPath = artifactFilePath.resolve(path)
+  artifactManager.addArtifact(artifactPath.toString)
+
+  val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+  // Single `AddArtifactRequest`
+  assert(receivedRequests.size == 1)
+
+  val request = receivedRequests.head
+  assert(request.hasBatch)
+
+  val batch = request.getBatch
+  // Single artifact in batch
+  assert(batch.getArtifactsList.size() == 1)
+
+  val singleChunkArtifact = batch.getArtifacts(0)
+  val namePrefix = artifactPath.getFileName.toString match {
+case jar if jar.endsWith(".jar") => "jars"
+case cf if cf.endsWith(".class") => "classes"
+  }
+  assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+  assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+}
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+val buf = new Array[Byte](CHUNK_SIZE)
+var bytesRead = 0
+var count = 0
+while (count != -1 && bytesRead < CHUNK_SIZE) {
+  count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+  if (count != -1) {
+bytesRead += count
+  }
+}
+  

[GitHub] [spark] mridulm commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm commented on PR #39459:
URL: https://github.com/apache/spark/pull/39459#issuecomment-1452754371

   Merged to master.
   Thanks for fixing this @ivoson !
   Thanks for the reviews @Ngone51 :-)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-02 Thread via GitHub


mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator 
undercount in the case of the retry task with rdd cache
URL: https://github.com/apache/spark/pull/39459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`

2023-03-02 Thread via GitHub


ueshin commented on code in PR #40210:
URL: https://github.com/apache/spark/pull/40210#discussion_r1123891426


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.collection.JavaConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
InvalidPlanInput}
+import org.apache.spark.sql.connect.planner.SparkConnectPlanner
+import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, 
FormattedMode, SimpleMode}
+
+private[connect] class SparkConnectAnalyzeHandler(
+responseObserver: StreamObserver[proto.AnalyzePlanResponse])
+extends Logging {
+
+  def handle(request: proto.AnalyzePlanRequest): Unit = {
+val session =
+  SparkConnectService
+.getOrCreateIsolatedSession(request.getUserContext.getUserId, 
request.getClientId)
+.session
+
+val response = process(request, session)
+responseObserver.onNext(response)
+responseObserver.onCompleted()

Review Comment:
   I guess we should surround this with `session.withActive`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ueshin opened a new pull request, #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available

2023-03-02 Thread via GitHub


ueshin opened a new pull request, #40260:
URL: https://github.com/apache/spark/pull/40260

   ### What changes were proposed in this pull request?
   
   Delays parsing DDL string for Python UDFs until `SparkConnectClient` is 
available.
   
   Also changes `createDataFrame` to use the proto `DDLParse`.
   
   ### Why are the changes needed?
   
   Currently `parse_data_type` depends on `PySparkSession` that creates a local 
PySpark, but it won't be available in the client side.
   
   When `SparkConnectClient` is available, we can use the new proto `DDLParse` 
to parse the data types as string.
   
   ### Does this PR introduce _any_ user-facing change?
   
   The UDF's `returnType` attribute could be a string in Spark Connect if it is 
provided as string.
   
   ### How was this patch tested?
   
   Existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123833306


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123829338


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821569


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821210


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onComple

[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123818828


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   Where is the other script?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on pull request #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


amaliujia commented on PR #40259:
URL: https://github.com/apache/spark/pull/40259#issuecomment-1452625458

   @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia opened a new pull request, #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions

2023-03-02 Thread via GitHub


amaliujia opened a new pull request, #40259:
URL: https://github.com/apache/spark/pull/40259

   
   
   ### What changes were proposed in this pull request?
   
   Add tests for grouping() and grouping_id() functions.
   
   ### Why are the changes needed?
   
   Improve testing coverage.
   
   ### Does this PR introduce _any_ user-facing change?
   
   NO
   
   ### How was this patch tested?
   
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40238:
URL: https://github.com/apache/spark/pull/40238#discussion_r1123698953


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -353,11 +353,16 @@ message LocalRelation {
   optional bytes data = 1;
 
   // (Optional) The schema of local data.
-  // It should be either a DDL-formatted type string or a JSON string.
   //
   // The server side will update the column names and data types according to 
this schema.
   // If the 'data' is not provided, then this schema will be required.
-  optional string schema = 2;
+  oneof schema {
+// Either a DDL-formatted type string or a JSON string.
+string schemaString = 2;

Review Comment:
   thanks; it is been a while since I touched proto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123697756


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash

Review Comment:
   Script files need to have a license as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123696037


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Can you also set the user agent to something meaningful?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123695202


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Oh BTW. there does not seem to be SparkSession.Builder.remote() function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123688156


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""

Review Comment:
   Feature request, can we try to configure this using a `--connectionString` 
argument?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite

2023-03-02 Thread via GitHub


hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale 
entries from the excluding rules for CompatibilitySuite
URL: https://github.com/apache/spark/pull/40241


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite

2023-03-02 Thread via GitHub


hvanhovell commented on PR #40241:
URL: https://github.com/apache/spark/pull/40241#issuecomment-1452528841

   Merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123661416


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package

Review Comment:
   Remove comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123660107


##
bin/spark-connect-scala-client:
##
@@ -0,0 +1,26 @@
+#!/usr/bin/env bash
+
+# Use the spark connect JVM client to connect to a spark connect server.
+#
+# Start a local server:
+# A local spark-connect server with default settings can be started using the 
following command:
+#  `bin/spark-connect`
+# The client should be able to connect to this server directly with the 
default client settings.
+#
+# Connect to a remote server:
+# To connect to a remote server, use env var `SPARK_REMOTE` to configure the 
client connection
+# string. e.g.
+#  `export SPARK_REMOTE="sc://:/;token=;="`
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark connect JVM client
+build/sbt "sql/package;connect-client-jvm/assembly"
+
+CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export 
connect-client-jvm/fullClasspath" | grep jar | tail -n1)"
+SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" 
| grep jar | tail -n1)"
+
+INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc
+"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" 
-i $INIT_SCRIPT

Review Comment:
   TIL you can do this :)...
   
   I have multiple scala versions floating around in my spark dir. We should 
pick the current one...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   We have two scripts for starting and stopping a connect server. Should we 
use those instead? If this is for dev, then we should keep these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123654587


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash

Review Comment:
   Since these scripts are aimed at development, maybe put them in 
connect/connect/bin?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670


##
bin/spark-connect:
##
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+if [ -z "${SPARK_HOME}" ]; then
+  source "$(dirname "$0")"/find-spark-home
+fi
+
+# Build the jars needed for spark submit
+#build/sbt package
+
+# Build the jars needed for spark connect
+build/sbt "connect/assembly"
+
+CONNECT_JAR=`ls 
"${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar
 | paste -sd ',' -`
+
+exec "${SPARK_HOME}"/bin/spark-submit --class 
org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR"

Review Comment:
   We have two scripts for starting and stopping a connect server. Should we 
use those instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40257:
URL: https://github.com/apache/spark/pull/40257#discussion_r1123653086


##
bin/spark-connect-scala-client.sc:
##
@@ -0,0 +1,15 @@
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.SparkSession
+
+val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") 
else ""
+val sessionBuilder = SparkSession.builder()
+val spark = if (conStr.isEmpty) sessionBuilder.build() else 
sessionBuilder.remote(conStr).build()

Review Comment:
   Do we default to a local connection if none is set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] shrprasa opened a new pull request, #40258: [WIP][SPARK-42655]:Incorrect ambiguous column reference error

2023-03-02 Thread via GitHub


shrprasa opened a new pull request, #40258:
URL: https://github.com/apache/spark/pull/40258

   Incorrect ambiguous column reference error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123642955


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+service = new DummySparkConnectService()
+server = InProcessServerBuilder
+  .forName(getClass.getName)
+  .addService(service)
+  .build()
+server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+channel = 
InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+artifactManager = new 
ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+startDummyServer()
+createArtifactManager()
+client = null
+  }
+
+  override def afterEach(): Unit = {
+if (server != null) {
+  server.shutdownNow()
+  assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to 
shutdown")
+}
+
+if (channel != null) {
+  channel.shutdownNow()
+}
+
+if (client != null) {
+  client.shutdown()
+}
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = 
baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is 
equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+  artifactChunk: AddArtifactsRequest.ArtifactChunk,
+  localPath: Path): Unit = {
+val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+val localData = ByteString.readFrom(in)
+assert(artifactChunk.getData == localData)
+assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+test(s"Single Chunk Artifact - $path") {
+  val artifactPath = artifactFilePath.resolve(path)
+  artifactManager.addArtifact(artifactPath.toString)
+
+  val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+  // Single `AddArtifactRequest`
+  assert(receivedRequests.size == 1)
+
+  val request = receivedRequests.head
+  assert(request.hasBatch)
+
+  val batch = request.getBatch
+  // Single artifact in batch
+  assert(batch.getArtifactsList.size() == 1)
+
+  val singleChunkArtifact = batch.getArtifacts(0)
+  val namePrefix = artifactPath.getFileName.toString match {
+case jar if jar.endsWith(".jar") => "jars"
+case cf if cf.endsWith(".class") => "classes"
+  }
+  assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+  assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+}
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+val buf = new Array[Byte](CHUNK_SIZE)
+var bytesRead = 0
+var count = 0
+while (count != -1 && bytesRead < CHUNK_SIZE) {
+  count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+  if (count != -1) {
+bytesRead += count
+  }
+}
+ 

[GitHub] [spark] zhenlineo opened a new pull request, #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client

2023-03-02 Thread via GitHub


zhenlineo opened a new pull request, #40257:
URL: https://github.com/apache/spark/pull/40257

   ### What changes were proposed in this pull request?
   Adding a simple script to start the Scala client in the Scala REPL. As well 
as a script to start the spark connect server for the client to connect to.
   
   ### Why are the changes needed?
   Make the JVM client more easy to be used by users.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Manually tested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123591424


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590597


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590310


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

2023-03-02 Thread via GitHub


hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123588842


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts 
from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) 
{
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+// Currently only local files with extensions .jar and .class are 
supported.
+uri.getScheme match {
+  case "file" =>
+val path = Paths.get(uri)
+val artifact = path.getFileName.toString match {
+  case jar if jar.endsWith(".jar") =>
+newJarArtifact(path.getFileName, new LocalFile(path))
+  case cf if cf.endsWith(".class") =>
+newClassArtifact(path.getFileName, new LocalFile(path))
+  case other =>
+throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+}
+Seq[Artifact](artifact)
+
+  case other =>
+throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+}
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and 
are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+val promise = Promise[Seq[ArtifactSummary]]
+val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+  private val summaries = mutable.Buffer.empty[ArtifactSummary]
+  override def onNext(v: AddArtifactsResponse): Unit = {
+v.getArtifactsList.forEach { summary =>
+  summaries += summary
+}
+  }
+  override def onError(throwable: Throwable): Unit = {
+promise.failure(throwable)
+  }
+  override def onCompl

[GitHub] [spark] peter-toth commented on a diff in pull request #40093: [SPARK-42500][SQL] ConstantPropagation supports more cases

2023-03-02 Thread via GitHub


peter-toth commented on code in PR #40093:
URL: https://github.com/apache/spark/pull/40093#discussion_r1123582002


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##
@@ -200,14 +200,20 @@ object ConstantPropagation extends Rule[LogicalPlan] {
 
   private def replaceConstants(condition: Expression, equalityPredicates: 
EqualityPredicates)
 : Expression = {
-val constantsMap = AttributeMap(equalityPredicates.map(_._1))
-val predicates = equalityPredicates.map(_._2).toSet
-def replaceConstants0(expression: Expression) = expression transform {
+val allConstantsMap = AttributeMap(equalityPredicates.map(_._1))
+val allPredicates = equalityPredicates.map(_._2).toSet
+def replaceConstants0(
+expression: Expression, constantsMap: AttributeMap[Literal]) = 
expression transform {
   case a: AttributeReference => constantsMap.getOrElse(a, a)
 }
 condition transform {
-  case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants0(e)
-  case e @ EqualNullSafe(_, _) if !predicates.contains(e) => 
replaceConstants0(e)
+  case b: BinaryComparison =>

Review Comment:
   Let me open a PR tomorrow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >