[GitHub] [spark] ulysses-you commented on pull request #40262: [SPARK-42651][SQL] Optimize global sort to driver sort
ulysses-you commented on PR #40262: URL: https://github.com/apache/spark/pull/40262#issuecomment-1453105982 @zhengruifeng thank you for your thought. The original idea of driver sort is to avoid one shuffle. Requires SinglePartition seems does not help since it still requires a shuffle. Besides, finally, the result would go to driver, i.e. `df.sort.collect` (It's the reason I match `ReturnAnswer`), so it should be fine to do at driver. Plus, do sort at driver is not the first code place except driver sort. e.g., the merge function of rdd.takeOrdered. It should be safe if the size of plan is small enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mskapilks commented on pull request #40266: [SPARK-42660] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)
mskapilks commented on PR #40266: URL: https://github.com/apache/spark/pull/40266#issuecomment-1453097749 TPCH q21 plan change |Before|After| |--|| |![Web capture_3-3-2023_125513_ms web azuresynapse net_crop](https://user-images.githubusercontent.com/106726387/222658668-830d9ddb-2cc2-4013-8dbc-29967d957954.jpeg)|![Web capture_3-3-2023_12324_ms web azuresynapse net_crop](https://user-images.githubusercontent.com/106726387/222658801-f917a4cd-f825-4567-aca8-cb596dd64e51.jpeg)| -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40262: [SPARK-42651][SQL] Optimize global sort to driver sort
zhengruifeng commented on PR #40262: URL: https://github.com/apache/spark/pull/40262#issuecomment-1453092378 just a idea, what about: making `SortExec` requires `SinglePartition` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40259: [SPARK-42609][CONNECT][TESTS] Add tests for grouping() and grouping_id() functions
zhengruifeng commented on PR #40259: URL: https://github.com/apache/spark/pull/40259#issuecomment-1453086077 merged into master/3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1453086009 > @beliefer you can create a test in `PlanGenerationTestSuite`. That will at least validate the proto message we are generating, and it will validate that plan you are producing yields a valid plan in `ProtoToPlanTestSuite`. OK. I added the test cases. But throws ``` org.h2.jdbc.JdbcSQLSyntaxErrorException: Schema "TEST" not found; SQL statement: SELECT * FROM TEST.TIMETYPES WHERE 1=0 [90079-214] ``` when ProtoToParsedPlanTestSuite validating the golden file. The built-in H2 running in server side and we can't start H2 database at connect API. @hvanhovell Could you tell me how to fix or avoid it ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #40259: [SPARK-42609][CONNECT][TESTS] Add tests for grouping() and grouping_id() functions
zhengruifeng closed pull request #40259: [SPARK-42609][CONNECT][TESTS] Add tests for grouping() and grouping_id() functions URL: https://github.com/apache/spark/pull/40259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mskapilks opened a new pull request, #40266: [SPARK-42660] Infer filters for Join produced by IN and EXISTS clause (RewritePredicateSubquery rule)
mskapilks opened a new pull request, #40266: URL: https://github.com/apache/spark/pull/40266 ### What changes were proposed in this pull request? We should run `InferFiltersFromConstraints` again after running `RewritePredicateSubquery` rule. `RewritePredicateSubquery `rewrite IN and EXISTS queries to LEFT SEMI/LEFT ANTI joins. But we don't infer filters for these newly generated joins. We noticed in TPCH 1TB q21 by inferring filter for these new joins, one `lineitem` table scan can be reduced as `ReusedExchange` got introduce. Previously due to mismatch in filter predicates reuse was not happening. ### Why are the changes needed? Can improve query performance. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? PlanStability test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer opened a new pull request, #40265: [SPARK-42556][CONNECT] Dataset.colregex should link a plan_id when it only matches a single column.
beliefer opened a new pull request, #40265: URL: https://github.com/apache/spark/pull/40265 ### What changes were proposed in this pull request? When colregex returns a single column it should link the plans plan_id. For reference here is the non-connect Dataset code that does this: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L1512 This also needs to be fixed for the Python client. ### Why are the changes needed? Let the UnresolvedAttribute link plan_id if it is exist. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chenhao-db commented on pull request #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression
chenhao-db commented on PR #40264: URL: https://github.com/apache/spark/pull/40264#issuecomment-1453062944 @MaxGekk Please take a look, thanks for reviewing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chenhao-db opened a new pull request, #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression
chenhao-db opened a new pull request, #40264: URL: https://github.com/apache/spark/pull/40264 This is a backport of #40237. ### What changes were proposed in this pull request? This PR fixed the counter-intuitive behaviors of the `TimestampAdd` expression mentioned in https://issues.apache.org/jira/browse/SPARK-42635. See the following *user-facing* changes for details. ### Does this PR introduce _any_ user-facing change? Yes. This PR fixes the three problems mentioned in SPARK-42635: 1. When the time is close to daylight saving time transition, the result may be discontinuous and not monotonic. 2. Adding month, quarter, and year silently ignores `Int` overflow during unit conversion. 3. Adding sub-month units (week, day, hour, minute, second, millisecond, microsecond)silently ignores `Long` overflow during unit conversion. Some examples of the result changes: Old results: ``` // In America/Los_Angeles timezone: timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 (this is correct, put it here for comparison) timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 timestampadd(SECOND, 86400 - 1, 2011-03-12 03:00:00) = 2011-03-13 03:59:59 timestampadd(SECOND, 86400, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 // In UTC timezone: timestampadd(quarter, 1431655764, 1970-01-01 00:00:00) = 1969-09-01 00:00:00 timestampadd(day, 106751992, 1970-01-01 00:00:00) = -290308-12-22 15:58:10.448384 ``` New results: ``` // In America/Los_Angeles timezone: timestampadd(DAY, 1, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 timestampadd(HOUR, 23, 2011-03-12 03:00:00) = 2011-03-13 03:00:00 timestampadd(HOUR, 24, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 timestampadd(SECOND, 86400 - 1, 2011-03-12 03:00:00) = 2011-03-13 03:59:59 timestampadd(SECOND, 86400, 2011-03-12 03:00:00) = 2011-03-13 04:00:00 // In UTC timezone: timestampadd(quarter, 1431655764, 1970-01-01 00:00:00) = throw overflow exception timestampadd(day, 106751992, 1970-01-01 00:00:00) = throw overflow exception ``` ### How was this patch tested? Pass existing tests and some new tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk commented on PR #40237: URL: https://github.com/apache/spark/pull/40237#issuecomment-1453059745 @chenhao-db Congratulations with your first contribution to Apache Spark! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk closed pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression. URL: https://github.com/apache/spark/pull/40237 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk commented on PR #40237: URL: https://github.com/apache/spark/pull/40237#issuecomment-1453051043 @chenhao-db Your changes cause some conflicts in `branch-3.3`. Please, open a separate PR with a backport to Spark 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
MaxGekk commented on PR #40237: URL: https://github.com/apache/spark/pull/40237#issuecomment-1453050502 +1, LGTM. Merging to master/3.4. Thank you, @chenhao-db. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] the8thC commented on pull request #40236: [SPARK-38735][SQL][Tests] Add tests for the error class: INTERNAL_ERROR
the8thC commented on PR #40236: URL: https://github.com/apache/spark/pull/40236#issuecomment-1453042684 @MaxGekk Can you take a look at this please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] chenhao-db commented on pull request #40237: [SPARK-42635][SQL] Fix the TimestampAdd expression.
chenhao-db commented on PR #40237: URL: https://github.com/apache/spark/pull/40237#issuecomment-1453041214 @MaxGekk Thanks for reviewing! I guess I need to ask you to merge this PR right? Also, it should be ported into older branches with `TimestampAdd` (3.3.0, 3.3.1, 3.3.2). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client URL: https://github.com/apache/spark/pull/40257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ritikam2 commented on pull request #40116: [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect
ritikam2 commented on PR #40116: URL: https://github.com/apache/spark/pull/40116#issuecomment-1453013580 @cloud-fan always using unresolvedAlias seems to be causing the sql-other module to fail. Will be reverting to the original fix of creating unresolvedAlias only for "*" or distinct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on PR #40257: URL: https://github.com/apache/spark/pull/40257#issuecomment-1453013522 Merged to master and branch-3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
HyukjinKwon commented on PR #40238: URL: https://github.com/apache/spark/pull/40238#issuecomment-1453013336 test: https://github.com/hvanhovell/spark/actions/runs/4318951397 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations
zhengruifeng opened a new pull request, #40263: URL: https://github.com/apache/spark/pull/40263 ### What changes were proposed in this pull request? Reimplement `FPGrowthModel.transform` with dataframe operations ### Why are the changes needed? delay the `collect()` of model dataframe `associationRules` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1124028359 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully. This is relevant in context of consistent accumulator status.") + .version("3.4.0") Review Comment: No, we should mark this as 3.5.0 instead ... slipped through the review (it was created way before 3.4 was cut). Thanks for catching this @ulysses-you ! Can you create a follow up to change this to 3.5 @ivoson ? Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1124028359 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully. This is relevant in context of consistent accumulator status.") + .version("3.4.0") Review Comment: No, we should mark this as 3.5.0 instead ... slipped through the review (it was created way before 3.4 was cut). Can you create a follow up to change this to 3.5 @ivoson ? Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you opened a new pull request, #40262: [SPARK-42651][SQL] Optimize global sort to driver sort
ulysses-you opened a new pull request, #40262: URL: https://github.com/apache/spark/pull/40262 ### What changes were proposed in this pull request? This pr adds a new physical plan `DriverSortExec`. It represents a special case of global `SortExec` which is the root node. Then we can save one shuffle. Add a new config `spark.sql.execution.driverSortThreshold` to control if it's safe to do sort at driver side. We should make sure the max rows of plan is small enough to avoid oom and preformance regression. This optimization should work fine since It gets benefits from AQE framework that the max rows of logical plan is accurate and it can be propagated cross most of nodes. ### Why are the changes needed? Improve performance ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test and do some benchmark Sort 50, 000 rows: ``` OpenJDK 64-Bit Server VM 1.8.0_362-b09 on Linux 5.15.0-1033-azure Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz DriverSort: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative Sort 1158 1355 210 0.0 23169.3 1.0X DriverSort 636680 55 0.1 12715.2 1.8X ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123993741 ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: This scripts ensures that the jars are built up-to-date. So that we can run the client/server to test out the new code quickly. This could be useful for example to test/debug udf etc. This is also why we keep it here close to the client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123993315 ## connector/connect/bin/spark-connect-scala-client: ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `connector/connect/bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" Review Comment: sorry I misread the path. ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: sorry I misread the path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
zhengruifeng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123992438 ## python/pyspark/sql/connect/_typing.py: ## @@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol): deterministic: bool @property -def returnType(self) -> DataType: +def returnType(self) -> DataTypeOrString: Review Comment: Yeah, but I am still confused about it: that the old implementation `PySparkSession.builder.getOrCreate().createDataFrame(data=[], schema=data_type).schema` works. I also tried ``` session = PySparkSession.builder.getOrCreate() parsed = session.client._analyze( # type: ignore[attr-defined] method="ddl_parse", ddl_string=data_type ).parsed ``` and at least the tests passed. But if I try ``` parsed = PySparkSession.builder.getOrCreate().client._analyze( # type: ignore[attr-defined] method="ddl_parse", ddl_string=data_type ).parsed ``` the tests always fail with `ValueError: Cannot invoke RPC on closed channel!` Maybe we will have to add a pure python ddl parser, i don't know -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123990449 ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: Should probably put it in `dev` directory if this is for dev .. BTW, we have added `connect-` prefix in connect related scripts cc @grundprinzip FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123990665 ## connector/connect/bin/spark-connect-scala-client: ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `connector/connect/bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" Review Comment: ditto. If this is for dev, I would put this in `dev` directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123990449 ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: Should probably put it in `dev` directory if this is for dev .. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123990365 ## connector/connect/bin/spark-connect-scala-client: ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `connector/connect/bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" Review Comment: why? this is meant for development. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123990101 ## connector/connect/bin/spark-connect-scala-client: ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `connector/connect/bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" Review Comment: Can we make `spark-shell` support this, with `--remote` and `--jars` or `--packages` option? That would be ideal. Again I don't mind doing this separately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123989785 ## connector/connect/bin/spark-connect-scala-client: ## @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `connector/connect/bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" Review Comment: It's actually a bit odd to have build command here .. it won't work once we release this out. I don't mind addressing this separately but we would have to fix it before the release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123989425 ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: This is more aimed at development. There are quite a few differences: - This builds the code if we need it - This does not return after starting the process. It actually logs to the console which makes debugging easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
amaliujia commented on PR #40256: URL: https://github.com/apache/spark/pull/40256#issuecomment-1452899534 Overall looks good. Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
amaliujia commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123987429 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompleted(): Unit = { +promise.success(summaries.toSeq) + } +} +val stream = stub.addArtifacts(responseHandler) +val currentBatch = mutable.Buffer.empty[Artifact] +var currentBatchSize = 0L + +def addToBatch(dep: Artifact, size: Long): Unit = { + currentBatch += dep + currentBatchSize += size +} + +def writeBatch(): Unit = { + addBatchedArtifacts(currentBatch.toSeq, stream) + currentBatch.clear() + currentBatchSize = 0 +} + +artifacts.iterator.foreach { artifact => + val data = artifact.storage + val size = data.size
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123987387 ## connector/connect/bin/spark-connect: ## @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Go to the Spark project root directory +FWDIR="$(cd "`dirname "$0"`"/../../..; pwd)" +cd "$FWDIR" +export SPARK_HOME=$FWDIR + +# Build the jars needed for spark submit and spark connect +build/sbt package Review Comment: Hm, I guess this can be replaced by `./sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123986940 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompleted(): Unit = { +promise.success(summaries.toSeq) + } +} +val stream = stub.addArtifacts(responseHandler) +val currentBatch = mutable.Buffer.empty[Artifact] +var currentBatchSize = 0L + +def addToBatch(dep: Artifact, size: Long): Unit = { + currentBatch += dep + currentBatchSize += size +} + +def writeBatch(): Unit = { + addBatchedArtifacts(currentBatch.toSeq, stream) + currentBatch.clear() + currentBatchSize = 0 +} + +artifacts.iterator.foreach { artifact => + val data = artifact.storage + val size = data.size
[GitHub] [spark] wangyum commented on a diff in pull request #40190: [SPARK-42597][SQL] UnwrapCastInBinaryComparison support unwrap timestamp type
wangyum commented on code in PR #40190: URL: https://github.com/apache/spark/pull/40190#discussion_r1123984775 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala: ## @@ -293,6 +299,34 @@ object UnwrapCastInBinaryComparison extends Rule[LogicalPlan] { } } + /** + * Move the cast to the literal side, because we can only get the minimum value of timestamp, + * so some BinaryComparison needs to be changed, + * such as CAST(ts AS date) > DATE '2023-01-01' ===> ts >= TIMESTAMP '2023-01-02 00:00:00' Review Comment: It is because it is easy to get the start of the day timestamp, not easy to get the end of the day timestamp. ```sql spark-sql> select cast(date'2023-01-01' as timestamp); 2023-01-01 00:00:00 ``` vs ```sql spark-sql> select cast(concat(cast(date'2023-01-01' as string), ' 23:59:59.99') as timestamp); 2023-01-01 23:59:59.99 ``` It will truncate if casting from long type to timestamp type: ```sql spark-sql> select cast(cast(cast(date'2023-01-01' as timestamp) as long) + (24* 60 * 60 - 1) as timestamp); 2023-01-01 23:59:59 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
ueshin commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123979423 ## python/pyspark/sql/connect/_typing.py: ## @@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol): deterministic: bool @property -def returnType(self) -> DataType: +def returnType(self) -> DataTypeOrString: Review Comment: Yes, that's true. However, the client is not available here and we don't have a proper way to parse the string here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ulysses-you commented on code in PR #39459: URL: https://github.com/apache/spark/pull/39459#discussion_r1123978338 ## core/src/main/scala/org/apache/spark/internal/config/package.scala: ## @@ -2468,4 +2468,15 @@ package object config { .version("3.4.0") .booleanConf .createWithDefault(false) + + private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED = +ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled") + .internal() + .doc("Set to be true to enabled RDD cache block's visibility status. Once it's enabled," + +" a RDD cache block can be used only when it's marked as visible. And a RDD block will be" + +" marked as visible only when one of the tasks generating the cache block finished" + +" successfully. This is relevant in context of consistent accumulator status.") + .version("3.4.0") Review Comment: According to the commit history, it seems this pr lands at master branch(3.5.0). Should this backport to 3.4.0 or update the version ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
xinrong-meng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123976155 ## python/pyspark/sql/connect/_typing.py: ## @@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol): deterministic: bool @property -def returnType(self) -> DataType: +def returnType(self) -> DataTypeOrString: Review Comment: Oh yes, I commented separately below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
xinrong-meng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740 ## python/pyspark/sql/connect/udf.py: ## @@ -99,9 +97,7 @@ def __init__( ) self.func = func -self._returnType = ( -parse_data_type(returnType) if isinstance(returnType, str) else returnType -) +self._returnType = returnType Review Comment: In the vanilla PySpark, a `returnType` property is defined as https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, which returns a DataType always. In the existing Connect's code, `wrapper.returnType = self._returnType` is utilized instead. I am afraid this line of change may break that property parity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
HyukjinKwon commented on PR #40257: URL: https://github.com/apache/spark/pull/40257#issuecomment-1452877870 Seems like tests did not pass, and it fails in the master branch (https://github.com/apache/spark/actions/runs/4319488463/jobs/7538760733). Let me quickly revert this for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
xinrong-meng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740 ## python/pyspark/sql/connect/udf.py: ## @@ -99,9 +97,7 @@ def __init__( ) self.func = func -self._returnType = ( -parse_data_type(returnType) if isinstance(returnType, str) else returnType -) +self._returnType = returnType Review Comment: In the vanilla PySpark, a `returnType` property is defined as https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, which returns a DataType always. In the existing Connect's code, `wrapper.returnType = self._returnType` is utilized instead. I am afraid that line of change may break that property parity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-meng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
xinrong-meng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123974740 ## python/pyspark/sql/connect/udf.py: ## @@ -99,9 +97,7 @@ def __init__( ) self.func = func -self._returnType = ( -parse_data_type(returnType) if isinstance(returnType, str) else returnType -) +self._returnType = returnType Review Comment: In the vanilla PySpark, a `returnType` property is defined as https://github.com/apache/spark/blob/master/python/pyspark/sql/udf.py#L233, which returns DataType always. In the existing Connect's code, `wrapper.returnType = self._returnType` is utilized instead. I am afraid that line of change may break that property parity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on a diff in pull request #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
zhengruifeng commented on code in PR #40260: URL: https://github.com/apache/spark/pull/40260#discussion_r1123974022 ## python/pyspark/sql/connect/_typing.py: ## @@ -57,7 +57,7 @@ class UserDefinedFunctionLike(Protocol): deterministic: bool @property -def returnType(self) -> DataType: +def returnType(self) -> DataTypeOrString: Review Comment: do this introduce a behavior change? https://github.com/apache/spark/blob/6ff760d483124b121d79c3a2d5fdc3ee3f27dd00/python/pyspark/sql/_typing.pyi#L70-L77 It seems that the PySpark UDF's `returnType` is always a `DataType` cc @xinrong-meng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive
zhengruifeng commented on PR #40261: URL: https://github.com/apache/spark/pull/40261#issuecomment-1452864911 thank you, merged into master/3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng closed pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive
zhengruifeng closed pull request #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive URL: https://github.com/apache/spark/pull/40261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
amaliujia commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompleted(): Unit = { +promise.success(summaries.toSeq) + } +} +val stream = stub.addArtifacts(responseHandler) +val currentBatch = mutable.Buffer.empty[Artifact] +var currentBatchSize = 0L + +def addToBatch(dep: Artifact, size: Long): Unit = { + currentBatch += dep + currentBatchSize += size +} + +def writeBatch(): Unit = { + addBatchedArtifacts(currentBatch.toSeq, stream) + currentBatch.clear() + currentBatchSize = 0 +} + +artifacts.iterator.foreach { artifact => + val data = artifact.storage + val size = data.size
[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
amaliujia commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompleted(): Unit = { +promise.success(summaries.toSeq) + } +} +val stream = stub.addArtifacts(responseHandler) +val currentBatch = mutable.Buffer.empty[Artifact] +var currentBatchSize = 0L + +def addToBatch(dep: Artifact, size: Long): Unit = { + currentBatch += dep + currentBatchSize += size +} + +def writeBatch(): Unit = { + addBatchedArtifacts(currentBatch.toSeq, stream) + currentBatch.clear() + currentBatchSize = 0 +} + +artifacts.iterator.foreach { artifact => + val data = artifact.storage + val size = data.size
[GitHub] [spark] LuciferYang commented on pull request #40213: [SPARK-42599][CONNECT][INFRA] Introduce `dev/connect-jvm-client-mima-check` instead of `CompatibilitySuite`
LuciferYang commented on PR #40213: URL: https://github.com/apache/spark/pull/40213#issuecomment-1452840888 Thanks @hvanhovell @amaliujia @zhenlineo -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell closed pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client URL: https://github.com/apache/spark/pull/40257 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on PR #40257: URL: https://github.com/apache/spark/pull/40257#issuecomment-1452827799 Merging this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123930392 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: Good point. IIRC, there was a decision not to implement it in Python. cc @HyukjinKwon @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
hvanhovell commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123926302 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: @ueshin why not implement a parser on the python side? That shouldn't be too hard, and it saves you from doing a lot of expensive rpcs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
srowen commented on PR #40220: URL: https://github.com/apache/spark/pull/40220#issuecomment-1452779722 Merged to master/3.4/3.3, for consistency with https://issues.apache.org/jira/browse/SPARK-40376 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types
srowen closed pull request #40220: [SPARK-42647][PYTHON] Change alias for numpy deprecated and removed types URL: https://github.com/apache/spark/pull/40220 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123923286 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: I think this change is fine. I'd rather like to change all places to accept both string and `DataType` and the choice is up to the client developers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
ueshin commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123920584 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { Review Comment: In some cases, we need to parse it beforehand anyway, for the case of #40240, we can't create converter from the local python object to Arrow table without the schema as `DataType` object. Another example is UDFs. It needs to pickle the function, and its return type as `DataType` object. We can't generate Python `DataType` object and pickle it in the `command` field in server side anymore. So we need to parse the DDL string beforehand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ivoson commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
ivoson commented on PR #39459: URL: https://github.com/apache/spark/pull/39459#issuecomment-1452774796 Thanks for the review and help on this. @mridulm @Ngone51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123913129 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" Review Comment: This feature request is a bit hard :D I cannot figure out a way to pass the args to the program rather than scala repl. I think it relates '-i'. We need '-i' to keep the repl running, then the repl thinks all args are to the repl rather than the scala script. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123911014 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" + +CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)" +SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)" + +INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc +"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" -i $INIT_SCRIPT Review Comment: Now it will find the version set in the top pom file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123910664 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: The user agent if not set, defaults to "_SPARK_CONNECT_SCALA". Do you want another name for the script client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on pull request #40252: [SPARK-42555][CONNECT] Add JDBC to DataFrameReader
beliefer commented on PR #40252: URL: https://github.com/apache/spark/pull/40252#issuecomment-1452763000 > I guess you can refer to `JDBCSuite` and `ClientE2ETestSuite` ? The built-in H2 running in server side and we can't start H2 database at connect API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
ueshin commented on code in PR #40210: URL: https://github.com/apache/spark/pull/40210#discussion_r1123898851 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.collection.JavaConverters._ + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput} +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode} + +private[connect] class SparkConnectAnalyzeHandler( +responseObserver: StreamObserver[proto.AnalyzePlanResponse]) +extends Logging { + + def handle(request: proto.AnalyzePlanRequest): Unit = { +val session = + SparkConnectService +.getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) +.session + +val response = process(request, session) +responseObserver.onNext(response) +responseObserver.onCompleted() Review Comment: #40261 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin opened a new pull request, #40261: [SPARK-42615][CONNECT][FOLLOWUP] Fix SparkConnectAnalyzeHandler to use withActive
ueshin opened a new pull request, #40261: URL: https://github.com/apache/spark/pull/40261 ### What changes were proposed in this pull request? Fix `SparkConnectAnalyzeHandler` to use `withActive`. ### Why are the changes needed? Similar to #40165, `SQLConf.get` is necessary when transforming the proto to plans. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123897616 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala: ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CRC32} + +import com.google.protobuf.ByteString +import io.grpc.{ManagedChannel, Server} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsRequest +import org.apache.spark.sql.connect.client.util.ConnectFunSuite + +class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { + + private var client: SparkConnectClient = _ + private var service: DummySparkConnectService = _ + private var server: Server = _ + private var artifactManager: ArtifactManager = _ + private var channel: ManagedChannel = _ + + private def startDummyServer(): Unit = { +service = new DummySparkConnectService() +server = InProcessServerBuilder + .forName(getClass.getName) + .addService(service) + .build() +server.start() + } + + private def createArtifactManager(): Unit = { +channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build() +artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel) + } + + override def beforeEach(): Unit = { +super.beforeEach() +startDummyServer() +createArtifactManager() +client = null + } + + override def afterEach(): Unit = { +if (server != null) { + server.shutdownNow() + assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown") +} + +if (channel != null) { + channel.shutdownNow() +} + +if (client != null) { + client.shutdown() +} + } + + private val CHUNK_SIZE: Int = 32 * 1024 + protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests") + + /** + * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local + * data at `localPath`. + * @param artifactChunk + * @param localPath + */ + private def assertFileDataEquality( + artifactChunk: AddArtifactsRequest.ArtifactChunk, + localPath: Path): Unit = { +val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32) +val localData = ByteString.readFrom(in) +assert(artifactChunk.getData == localData) +assert(artifactChunk.getCrc == in.getChecksum.getValue) + } + + private def singleChunkArtifactTest(path: String): Unit = { +test(s"Single Chunk Artifact - $path") { + val artifactPath = artifactFilePath.resolve(path) + artifactManager.addArtifact(artifactPath.toString) + + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + val namePrefix = artifactPath.getFileName.toString match { +case jar if jar.endsWith(".jar") => "jars" +case cf if cf.endsWith(".class") => "classes" + } + assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path)) + assertFileDataEquality(singleChunkArtifact.getData, artifactPath) +} + } + + singleChunkArtifactTest("smallClassFile.class") + + singleChunkArtifactTest("smallJar.jar") + + private def readNextChunk(in: InputStream): ByteString = { +val buf = new Array[Byte](CHUNK_SIZE) +var bytesRead = 0 +var count = 0 +while (count != -1 && bytesRead < CHUNK_SIZE) { + count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead) + if (count != -1) { +bytesRead += count + } +} +
[GitHub] [spark] mridulm commented on pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm commented on PR #39459: URL: https://github.com/apache/spark/pull/39459#issuecomment-1452754371 Merged to master. Thanks for fixing this @ivoson ! Thanks for the reviews @Ngone51 :-) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache
mridulm closed pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache URL: https://github.com/apache/spark/pull/39459 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on a diff in pull request #40210: [SPARK-42615][CONNECT][PYTHON] Refactor the AnalyzePlan RPC and add `session.version`
ueshin commented on code in PR #40210: URL: https://github.com/apache/spark/pull/40210#discussion_r1123891426 ## connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectAnalyzeHandler.scala: ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.collection.JavaConverters._ + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, SparkSession} +import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, InvalidPlanInput} +import org.apache.spark.sql.connect.planner.SparkConnectPlanner +import org.apache.spark.sql.execution.{CodegenMode, CostMode, ExtendedMode, FormattedMode, SimpleMode} + +private[connect] class SparkConnectAnalyzeHandler( +responseObserver: StreamObserver[proto.AnalyzePlanResponse]) +extends Logging { + + def handle(request: proto.AnalyzePlanRequest): Unit = { +val session = + SparkConnectService +.getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) +.session + +val response = process(request, session) +responseObserver.onNext(response) +responseObserver.onCompleted() Review Comment: I guess we should surround this with `session.withActive`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin opened a new pull request, #40260: [SPARK-42630][CONNECT][PYTHON] Delay parsing DDL string until SparkConnectClient is available
ueshin opened a new pull request, #40260: URL: https://github.com/apache/spark/pull/40260 ### What changes were proposed in this pull request? Delays parsing DDL string for Python UDFs until `SparkConnectClient` is available. Also changes `createDataFrame` to use the proto `DDLParse`. ### Why are the changes needed? Currently `parse_data_type` depends on `PySparkSession` that creates a local PySpark, but it won't be available in the client side. When `SparkConnectClient` is available, we can use the new proto `DDLParse` to parse the data types as string. ### Does this PR introduce _any_ user-facing change? The UDF's `returnType` attribute could be a string in Spark Connect if it is provided as string. ### How was this patch tested? Existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123833306 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123829338 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123821569 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
vicennial commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123821210 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onComple
[GitHub] [spark] zhenlineo commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123818828 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: Where is the other script? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on pull request #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions
amaliujia commented on PR #40259: URL: https://github.com/apache/spark/pull/40259#issuecomment-1452625458 @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia opened a new pull request, #40259: [SPARK-42609][CONNECT] Add tests for grouping() and grouping_id() functions
amaliujia opened a new pull request, #40259: URL: https://github.com/apache/spark/pull/40259 ### What changes were proposed in this pull request? Add tests for grouping() and grouping_id() functions. ### Why are the changes needed? Improve testing coverage. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40238: [SPARK-42633][CONNECT] Make LocalRelation take an actual schema
hvanhovell commented on code in PR #40238: URL: https://github.com/apache/spark/pull/40238#discussion_r1123698953 ## connector/connect/common/src/main/protobuf/spark/connect/relations.proto: ## @@ -353,11 +353,16 @@ message LocalRelation { optional bytes data = 1; // (Optional) The schema of local data. - // It should be either a DDL-formatted type string or a JSON string. // // The server side will update the column names and data types according to this schema. // If the 'data' is not provided, then this schema will be required. - optional string schema = 2; + oneof schema { +// Either a DDL-formatted type string or a JSON string. +string schemaString = 2; Review Comment: thanks; it is been a while since I touched proto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123697756 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash Review Comment: Script files need to have a license as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123696037 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Can you also set the user agent to something meaningful? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123695202 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Oh BTW. there does not seem to be SparkSession.Builder.remote() function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123688156 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" Review Comment: Feature request, can we try to configure this using a `--connectionString` argument? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite
hvanhovell closed pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite URL: https://github.com/apache/spark/pull/40241 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #40241: [SPARK-42640][CONNECT] Remove stale entries from the excluding rules for CompatibilitySuite
hvanhovell commented on PR #40241: URL: https://github.com/apache/spark/pull/40241#issuecomment-1452528841 Merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123661416 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package Review Comment: Remove comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123660107 ## bin/spark-connect-scala-client: ## @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +# Use the spark connect JVM client to connect to a spark connect server. +# +# Start a local server: +# A local spark-connect server with default settings can be started using the following command: +# `bin/spark-connect` +# The client should be able to connect to this server directly with the default client settings. +# +# Connect to a remote server: +# To connect to a remote server, use env var `SPARK_REMOTE` to configure the client connection +# string. e.g. +# `export SPARK_REMOTE="sc://:/;token=;="` + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark connect JVM client +build/sbt "sql/package;connect-client-jvm/assembly" + +CONNECT_CLASSPATH="$(build/sbt -DcopyDependencies=false "export connect-client-jvm/fullClasspath" | grep jar | tail -n1)" +SQL_CLASSPATH="$(build/sbt -DcopyDependencies=false "export sql/fullClasspath" | grep jar | tail -n1)" + +INIT_SCRIPT="${SPARK_HOME}"/bin/spark-connect-scala-client.sc +"${SPARK_HOME}"/build/scala*/bin/scala -cp "$CONNECT_CLASSPATH:$SQL_CLASSPATH" -i $INIT_SCRIPT Review Comment: TIL you can do this :)... I have multiple scala versions floating around in my spark dir. We should pick the current one... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: We have two scripts for starting and stopping a connect server. Should we use those instead? If this is for dev, then we should keep these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123654587 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash Review Comment: Since these scripts are aimed at development, maybe put them in connect/connect/bin? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653670 ## bin/spark-connect: ## @@ -0,0 +1,15 @@ +#!/usr/bin/env bash + +if [ -z "${SPARK_HOME}" ]; then + source "$(dirname "$0")"/find-spark-home +fi + +# Build the jars needed for spark submit +#build/sbt package + +# Build the jars needed for spark connect +build/sbt "connect/assembly" + +CONNECT_JAR=`ls "${SPARK_HOME}"/connector/connect/server/target/scala*/spark-connect-assembly*.jar | paste -sd ',' -` + +exec "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.sql.connect.SimpleSparkConnectService "$CONNECT_JAR" Review Comment: We have two scripts for starting and stopping a connect server. Should we use those instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
hvanhovell commented on code in PR #40257: URL: https://github.com/apache/spark/pull/40257#discussion_r1123653086 ## bin/spark-connect-scala-client.sc: ## @@ -0,0 +1,15 @@ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.SparkSession + +val conStr = if (sys.env.contains("SPARK_REMOTE")) sys.env("SPARK_REMOTE") else "" +val sessionBuilder = SparkSession.builder() +val spark = if (conStr.isEmpty) sessionBuilder.build() else sessionBuilder.remote(conStr).build() Review Comment: Do we default to a local connection if none is set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] shrprasa opened a new pull request, #40258: [WIP][SPARK-42655]:Incorrect ambiguous column reference error
shrprasa opened a new pull request, #40258: URL: https://github.com/apache/spark/pull/40258 Incorrect ambiguous column reference error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123642955 ## connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala: ## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.InputStream +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CRC32} + +import com.google.protobuf.ByteString +import io.grpc.{ManagedChannel, Server} +import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsRequest +import org.apache.spark.sql.connect.client.util.ConnectFunSuite + +class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { + + private var client: SparkConnectClient = _ + private var service: DummySparkConnectService = _ + private var server: Server = _ + private var artifactManager: ArtifactManager = _ + private var channel: ManagedChannel = _ + + private def startDummyServer(): Unit = { +service = new DummySparkConnectService() +server = InProcessServerBuilder + .forName(getClass.getName) + .addService(service) + .build() +server.start() + } + + private def createArtifactManager(): Unit = { +channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build() +artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel) + } + + override def beforeEach(): Unit = { +super.beforeEach() +startDummyServer() +createArtifactManager() +client = null + } + + override def afterEach(): Unit = { +if (server != null) { + server.shutdownNow() + assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown") +} + +if (channel != null) { + channel.shutdownNow() +} + +if (client != null) { + client.shutdown() +} + } + + private val CHUNK_SIZE: Int = 32 * 1024 + protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests") + + /** + * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local + * data at `localPath`. + * @param artifactChunk + * @param localPath + */ + private def assertFileDataEquality( + artifactChunk: AddArtifactsRequest.ArtifactChunk, + localPath: Path): Unit = { +val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32) +val localData = ByteString.readFrom(in) +assert(artifactChunk.getData == localData) +assert(artifactChunk.getCrc == in.getChecksum.getValue) + } + + private def singleChunkArtifactTest(path: String): Unit = { +test(s"Single Chunk Artifact - $path") { + val artifactPath = artifactFilePath.resolve(path) + artifactManager.addArtifact(artifactPath.toString) + + val receivedRequests = service.getAndClearLatestAddArtifactRequests() + // Single `AddArtifactRequest` + assert(receivedRequests.size == 1) + + val request = receivedRequests.head + assert(request.hasBatch) + + val batch = request.getBatch + // Single artifact in batch + assert(batch.getArtifactsList.size() == 1) + + val singleChunkArtifact = batch.getArtifacts(0) + val namePrefix = artifactPath.getFileName.toString match { +case jar if jar.endsWith(".jar") => "jars" +case cf if cf.endsWith(".class") => "classes" + } + assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path)) + assertFileDataEquality(singleChunkArtifact.getData, artifactPath) +} + } + + singleChunkArtifactTest("smallClassFile.class") + + singleChunkArtifactTest("smallJar.jar") + + private def readNextChunk(in: InputStream): ByteString = { +val buf = new Array[Byte](CHUNK_SIZE) +var bytesRead = 0 +var count = 0 +while (count != -1 && bytesRead < CHUNK_SIZE) { + count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead) + if (count != -1) { +bytesRead += count + } +} +
[GitHub] [spark] zhenlineo opened a new pull request, #40257: [SPARK-42656][CONNECT] Adding SCALA REPL shell script for JVM client
zhenlineo opened a new pull request, #40257: URL: https://github.com/apache/spark/pull/40257 ### What changes were proposed in this pull request? Adding a simple script to start the Scala client in the Scala REPL. As well as a script to start the spark connect server for the client to connect to. ### Why are the changes needed? Make the JVM client more easy to be used by users. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123591424 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123590597 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123590310 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
hvanhovell commented on code in PR #40256: URL: https://github.com/apache/spark/pull/40256#discussion_r1123588842 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala: ## @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connect.client + +import java.io.{ByteArrayInputStream, InputStream} +import java.net.URI +import java.nio.file.{Files, Path, Paths} +import java.util.concurrent.CopyOnWriteArrayList +import java.util.zip.{CheckedInputStream, CRC32} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +import Artifact._ +import com.google.protobuf.ByteString +import io.grpc.ManagedChannel +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.AddArtifactsResponse +import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * The Artifact Manager is responsible for handling and transferring artifacts from the local + * client to the server (local/remote). + * @param userContext + * @param channel + */ +class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) { + // Using the midpoint recommendation of 32KiB for chunk size as specified in + // https://github.com/grpc/grpc.github.io/issues/371. + private val CHUNK_SIZE: Int = 32 * 1024 + + private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel) + private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder] + + /** + * Register a [[ClassFinder]] for dynamically generated classes. + */ + def register(finder: ClassFinder): Unit = classFinders.add(finder) + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(path: String): Unit = { +addArtifact(Utils.resolveURI(path)) + } + + private def parseArtifacts(uri: URI): Seq[Artifact] = { +// Currently only local files with extensions .jar and .class are supported. +uri.getScheme match { + case "file" => +val path = Paths.get(uri) +val artifact = path.getFileName.toString match { + case jar if jar.endsWith(".jar") => +newJarArtifact(path.getFileName, new LocalFile(path)) + case cf if cf.endsWith(".class") => +newClassArtifact(path.getFileName, new LocalFile(path)) + case other => +throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +} +Seq[Artifact](artifact) + + case other => +throw new UnsupportedOperationException(s"Unsupported scheme: $other") +} + } + + /** + * Add a single artifact to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri)) + + /** + * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use. + */ + private[client] def ensureAllClassFileArtifactsUploaded(): Unit = { +addArtifacts(classFinders.asScala.flatMap(_.findClasses())) + } + + /** + * Add multiple artifacts to the session. + * + * Currently only local files with extensions .jar and .class are supported. + */ + def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) + + /** + * Add a number of artifacts to the session. + */ + private def addArtifacts(artifacts: Iterable[Artifact]): Unit = { +val promise = Promise[Seq[ArtifactSummary]] +val responseHandler = new StreamObserver[proto.AddArtifactsResponse] { + private val summaries = mutable.Buffer.empty[ArtifactSummary] + override def onNext(v: AddArtifactsResponse): Unit = { +v.getArtifactsList.forEach { summary => + summaries += summary +} + } + override def onError(throwable: Throwable): Unit = { +promise.failure(throwable) + } + override def onCompl
[GitHub] [spark] peter-toth commented on a diff in pull request #40093: [SPARK-42500][SQL] ConstantPropagation supports more cases
peter-toth commented on code in PR #40093: URL: https://github.com/apache/spark/pull/40093#discussion_r1123582002 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala: ## @@ -200,14 +200,20 @@ object ConstantPropagation extends Rule[LogicalPlan] { private def replaceConstants(condition: Expression, equalityPredicates: EqualityPredicates) : Expression = { -val constantsMap = AttributeMap(equalityPredicates.map(_._1)) -val predicates = equalityPredicates.map(_._2).toSet -def replaceConstants0(expression: Expression) = expression transform { +val allConstantsMap = AttributeMap(equalityPredicates.map(_._1)) +val allPredicates = equalityPredicates.map(_._2).toSet +def replaceConstants0( +expression: Expression, constantsMap: AttributeMap[Literal]) = expression transform { case a: AttributeReference => constantsMap.getOrElse(a, a) } condition transform { - case e @ EqualTo(_, _) if !predicates.contains(e) => replaceConstants0(e) - case e @ EqualNullSafe(_, _) if !predicates.contains(e) => replaceConstants0(e) + case b: BinaryComparison => Review Comment: Let me open a PR tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org