Re: [PR] [SPARK-48238][BUILD][YARN] Replace YARN AmIpFilter with a forked implementation [spark]
pan3793 commented on PR #46611: URL: https://github.com/apache/spark/pull/46611#issuecomment-2118631633 @cloud-fan I updated the code to reserve original code as much as possible, also leave comments to clarify the modifications. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting [spark]
HeartSaVioR commented on PR #46415: URL: https://github.com/apache/spark/pull/46415#issuecomment-2118631470 Also merged into 3.4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] test maven [spark]
panbingkun opened a new pull request, #46648: URL: https://github.com/apache/spark/pull/46648 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] assorted copy edits to migration instructions [spark]
github-actions[bot] commented on PR #45048: URL: https://github.com/apache/spark/pull/45048#issuecomment-2118519868 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]
github-actions[bot] commented on PR #43473: URL: https://github.com/apache/spark/pull/43473#issuecomment-2118519899 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
jiangzho commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605620527 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we Review Comment: thanks for the catch! fixed the typo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
jiangzho commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605620441 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); Review Comment: yes - it should be possible to use custom Cluster Manager by setting `spark.master`. When master is not expplicitly set in SparkConf, this would automatically generate master URL based
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
jiangzho commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605619790 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] Writing OperatorStateMetadata for the TransformWithState operator [spark]
ericm-db closed pull request #46647: Writing OperatorStateMetadata for the TransformWithState operator URL: https://github.com/apache/spark/pull/46647 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Writing OperatorStateMetadata for the TransformWithState operator [spark]
ericm-db opened a new pull request, #46647: URL: https://github.com/apache/spark/pull/46647 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] OperatorStateMetadata [spark]
ericm-db closed pull request #46645: [WIP] OperatorStateMetadata URL: https://github.com/apache/spark/pull/46645 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48324][SQL] Codegen Support for `hll_sketch_estimate` and `hll_union` [spark]
panbingkun commented on PR #46639: URL: https://github.com/apache/spark/pull/46639#issuecomment-2118470856 cc @yaooqinn @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48328][BUILD] Upgrade `Arrow` to 16.1.0 [spark]
panbingkun opened a new pull request, #46646: URL: https://github.com/apache/spark/pull/46646 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]
gengliangwang closed pull request #46643: [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError URL: https://github.com/apache/spark/pull/46643 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]
gengliangwang commented on PR #46643: URL: https://github.com/apache/spark/pull/46643#issuecomment-2118443352 Thanks, merging to branch-3.5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605570331 ## spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary; +import org.apache.spark.k8s.operator.status.ApplicationStatus; +import org.apache.spark.k8s.operator.status.AttemptInfo; + +class SparkAppSubmissionWorkerTest { Review Comment: Please add more test coverage. For example, a corner case like `generateHashBasedId` whose input has long long identifiers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605569613 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605568720 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605568720 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605567704 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605566899 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java: ## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.util.ArrayList; +import java.util.List; + +import scala.Tuple2; +import scala.collection.immutable.HashMap; +import scala.collection.immutable.Map; +import scala.jdk.CollectionConverters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.deploy.k8s.Config; +import org.apache.spark.deploy.k8s.Constants; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.SparkPod; +import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils; + +/** + * Resembles resources that would be directly launched by operator. Based on resolved + * org.apache.spark.deploy.k8s.KubernetesDriverSpec, it: + * + * + * Add ConfigMap as a resource for driver + * Converts scala types to Java for easier reference from operator + * + * + * This is not thread safe and not expected to be shared among reconciler threads + */ +public class SparkAppResourceSpec { + @Getter private final Pod configuredPod; + @Getter private final List driverPreResources; + @Getter private final List driverResources; + private final SparkAppDriverConf kubernetesDriverConf; + + public SparkAppResourceSpec( + SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec kubernetesDriverSpec) { +this.kubernetesDriverConf = kubernetesDriverConf; +String namespace = kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key()); +Map confFilesMap = +KubernetesClientUtils.buildSparkConfDirFilesMap( +kubernetesDriverConf.configMapNameDriver(), +kubernetesDriverConf.sparkConf(), +kubernetesDriverSpec.systemProperties()) +.$plus(new Tuple2<>(Config.KUBERNETES_NAMESPACE().key(), namespace)); +SparkPod sparkPod = addConfigMap(kubernetesDriverSpec.pod(), confFilesMap); +this.configuredPod = +new PodBuilder(sparkPod.pod()) +.editSpec() +.addToContainers(sparkPod.container()) +.endSpec() +.build(); +this.driverPreResources = +new ArrayList<>( + CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverPreKubernetesResources()) +.asJava()); +this.driverResources = +new ArrayList<>( + CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverKubernetesResources()) +.asJava()); +this.driverResources.add( +KubernetesClientUtils.buildConfigMap( +kubernetesDriverConf.configMapNameDriver(), confFilesMap, new HashMap<>())); +this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace)); +this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace)); + } + + private void setNamespaceIfMissing(HasMetadata resource, String namespace) { +if (StringUtils.isNotEmpty(resource.getMetadata().getNamespace())) { + return; +} +resource.getMetadata().setNamespace(namespace); + } + + private SparkPod addConfigMap(SparkPod pod, Map confFilesMap) { +Container containerWithVolume = +new ContainerBuilder(pod.container()) +.addNewEnv() +.withName(Constants.ENV_SPARK_CONF_DIR()) +.withValue(Constants.SPARK_CONF_DIR_INTERNAL()) +.endEnv() +.addNewVolumeMount() +.withName(Constants.SPARK_CONF_VOLUME_DRIVER()) +.withMountPath(Constants.SPARK_CONF_DIR_INTERNAL()) +.endVolumeMount() +.build(); +Pod podWithVolume = Review Comment: It would be great to have more specific name like
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605565719 ## spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java: ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary; +import org.apache.spark.k8s.operator.status.ApplicationStatus; +import org.apache.spark.k8s.operator.status.AttemptInfo; + +class SparkAppSubmissionWorkerTest { + @Test + void buildDriverConfShouldApplySpecAndPropertiesOverride() { +Map> constructorArgs = new HashMap<>(); +try (MockedConstruction mocked = +mockConstruction( +SparkAppDriverConf.class, +(mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments() { + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec mockSpec = mock(ApplicationSpec.class); + ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build(); + Map appProps = new HashMap<>(); + appProps.put("foo", "bar"); + appProps.put("spark.executor.instances", "1"); + appProps.put("spark.kubernetes.namespace", "ns2"); + Map overrides = new HashMap<>(); + overrides.put("spark.executor.instances", "5"); + overrides.put("spark.kubernetes.namespace", "ns3"); Review Comment: Please add a new test coverage with a long namespace length. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605565102 ## spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java: ## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.List; + +import scala.collection.immutable.HashMap; +import scala.collection.immutable.Seq; +import scala.jdk.javaapi.CollectionConverters; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.SparkPod; + +class SparkAppResourceSpecTest { + + @Test + void testDriverResourceIncludesConfigMap() { +SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class); +when(mockConf.configMapNameDriver()).thenReturn("foo-configmap"); +when(mockConf.sparkConf()) +.thenReturn(new SparkConf().set("spark.kubernetes.namespace", "foo-namespace")); + +KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class); +Pod driver = buildBasicPod("driver"); +SparkPod sparkPod = new SparkPod(driver, buildBasicContainer()); + +// Add some mock resources and pre-resources +Pod pod1 = buildBasicPod("pod-1"); +Pod pod2 = buildBasicPod("pod-2"); +List preResourceList = Collections.singletonList(pod1); +List resourceList = Collections.singletonList(pod2); +Seq preResourceSeq = CollectionConverters.asScala(preResourceList).toList(); +Seq resourceSeq = CollectionConverters.asScala(resourceList).toList(); +when(mockSpec.driverKubernetesResources()).thenReturn(resourceSeq); +when(mockSpec.driverPreKubernetesResources()).thenReturn(preResourceSeq); +when(mockSpec.pod()).thenReturn(sparkPod); +when(mockSpec.systemProperties()).thenReturn(new HashMap<>()); + +SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, mockSpec); + +Assertions.assertEquals(2, appResourceSpec.getDriverResources().size()); Review Comment: Shall we import `Assertions.assertEquals`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605564747 ## spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.mockito.Mockito.mock; + +import java.util.UUID; + +import scala.Option; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; + +class SparkAppDriverConfTest { + @Test + void testResourceNamePrefix() { +// Resource prefix shall be deterministic per SparkApp per attempt +SparkConf sparkConf = new SparkConf(); +sparkConf.set("foo", "bar"); +sparkConf.set("spark.executor.instances", "1"); +String appId = UUID.randomUUID().toString(); +SparkAppDriverConf sparkAppDriverConf = +SparkAppDriverConf.create( +sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty()); +String resourcePrefix = sparkAppDriverConf.resourceNamePrefix(); +Assertions.assertEquals(resourcePrefix, appId); + Assertions.assertTrue(sparkAppDriverConf.configMapNameDriver().contains(resourcePrefix)); Review Comment: Please add more test cases seperately at least for all public methods. For example, it would be great if `configMapNameDriver` has a new test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605562742 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); +String appId = generateSparkAppId(app); +effectiveSparkConf.setIfMissing("spark.app.id", appId); +return SparkAppDriverConf.create( +effectiveSparkConf, +appId, +
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605560525 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map confOverrides) { +SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); +return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map confOverrides) { +ApplicationSpec applicationSpec = app.getSpec(); +SparkConf effectiveSparkConf = new SparkConf(); +if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { +effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } +} +if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry entry : confOverrides.entrySet()) { +effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } +} +effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); +MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); +if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); +} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); +} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); +} +effectiveSparkConf.setIfMissing( +"spark.master", "k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;); Review Comment: To @jiangzho and @aaruna, as we know, Apache Spark's `ExternalClusterManager` allows a custom K8s-based external cluster manager. So, it would be great if `Spark K8s Operator` has a
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r160424 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java: ## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). This we Review Comment: `This we`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605551694 ## spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java: ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import scala.Option; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.Config; +import org.apache.spark.deploy.k8s.KubernetesDriverConf; +import org.apache.spark.deploy.k8s.KubernetesVolumeUtils; +import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils; +import org.apache.spark.deploy.k8s.submit.MainAppResource; + +public class SparkAppDriverConf extends KubernetesDriverConf { + private SparkAppDriverConf( + SparkConf sparkConf, + String appId, + MainAppResource mainAppResource, + String mainClass, + String[] appArgs, + Option proxyUser) { +super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null); + } + + public static SparkAppDriverConf create( + SparkConf sparkConf, + String appId, + MainAppResource mainAppResource, + String mainClass, + String[] appArgs, + Option proxyUser) { +// pre-create check only +KubernetesVolumeUtils.parseVolumesWithPrefix( +sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX()); +return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser); + } + + /** Application managed by operator has a deterministic prefix */ + @Override + public String resourceNamePrefix() { +return appId(); + } + + public String configMapNameDriver() { Review Comment: When we make a new K8s resource name, we should guarantee that this complies K8s naming limit. Could you add a method description, what is the range of string length of this method's return value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605547679 ## spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java: ## @@ -107,4 +108,12 @@ public static boolean overrideExecutorTemplateEnabled(ApplicationSpec applicatio && applicationSpec.getExecutorSpec() != null && applicationSpec.getExecutorSpec().getPodTemplateSpec() != null; } + + public static long getAttemptId(final SparkApplication app) { Review Comment: Thank you for adding `final`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605544109 ## gradle.properties: ## @@ -18,17 +18,23 @@ group=org.apache.spark.k8s.operator version=0.1.0 +# Caution: fabric8 version should be aligned with Spark dependency fabric8Version=6.12.1 commonsLang3Version=3.14.0 commonsIOVersion=2.16.1 lombokVersion=1.18.32 +#Spark Review Comment: We need a space. - `#Spark` -> `# Spark`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605543257 ## build.gradle: ## @@ -25,6 +25,11 @@ subprojects { repositories { mavenCentral() +// This is a workaround to resolve Spark 4.0.0-preview-1 +// To be removed for official release Review Comment: Apache Spark recommends to use `IDed TODO`s. Please file a JIRA issue and use it like the following. ``` - // To be removed for official release + // TODO(SPARK-X) Use Apache Spark 4.0.0-preview1 when it's ready ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605542332 ## build.gradle: ## @@ -25,6 +25,11 @@ subprojects { repositories { mavenCentral() +// This is a workaround to resolve Spark 4.0.0-preview-1 Review Comment: `4.0.0-preview-1` -> `4.0.0-preview1` - https://github.com/apache/spark/releases/tag/v4.0.0-preview1-rc1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]
dongjoon-hyun commented on code in PR #10: URL: https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605541866 ## gradle.properties: ## @@ -18,17 +18,23 @@ group=org.apache.spark.k8s.operator version=0.1.0 +# Caution: fabric8 version should be aligned with Spark dependency fabric8Version=6.12.1 commonsLang3Version=3.14.0 commonsIOVersion=2.16.1 lombokVersion=1.18.32 +#Spark +scalaVersion=2.13 +sparkVersion=4.0.0-preview1 Review Comment: > @dongjoon-hyun - can we use rc1 for submission worker for now, in order to unblock the operator module ? > > I'll be more than happy to upgrade & fix possible incompatibility when RC2 is ready. Thank you, @jiangzho . Thank you for updating to use RC1 first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1605523709 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -74,16 +80,28 @@ case class Mode( if (buffer.isEmpty) { return null } - +val collationAwareBuffer = + if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { +val modeMap = buffer.toSeq.groupMapReduce { + case (key: String, _) => +CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => +CollationFactory.getCollationKey(key, collationId) + case (key, _) => key +}(x => x)((x, y) => (x._1, x._2 + y._2)).values +modeMap + } else { +buffer + } Review Comment: @uros-db done ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -41,6 +42,11 @@ case class Mode( this(child, 0, 0, Some(reverse)) } + final lazy val collationId: Int = child.dataType match { +case c: StringType => c.collationId +case _ => CollationFactory.UTF8_BINARY_COLLATION_ID + } Review Comment: @uros-db done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on PR #46597: URL: https://github.com/apache/spark/pull/46597#issuecomment-2118330617 > since Mode expression works with any child expression, and you special-cased handling Strings, how do we handle Array(String) and Struct(String), etc.? In my local tests, I found that Mode performs a byte-by-byte comparison for structs, which does not consider collation. So that is still outstanding. Good catch! @uros-db There are several strategies we might adopt to handle structs with collation fields. I am looking into implementations. It is potentially straightforward though have some gotchas. Do you feel I should solve for that in a separate PR or in this one? I assume you prefer that this get solve in this PR and not a follow-up PR, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]
hvanhovell commented on code in PR #46570: URL: https://github.com/apache/spark/pull/46570#discussion_r1605509972 ## connector/connect/common/src/main/protobuf/spark/connect/base.proto: ## @@ -199,6 +200,17 @@ message AnalyzePlanRequest { // (Required) The logical plan to get the storage level. Relation relation = 1; } + + message Checkpoint { Review Comment: Caching is lazy, so it cannot trigger execution of the main query. Checkpoint can actually do this, which is weird for an analyze request. Moreover analyse does not work well with long running operations (i.e. an eager checkpoint) for the following reasons:: - Analyze needs a thread in the server's thread pool which in extreme cases can lead to exhaustion, or in a less extreme scenario can lead to reduced availability. - Long running request can be killed by intermediate proxies. Analyze does not support reattach which is an issue when the checkpoint takes too long. In that case you are basically dead in the water. The argument that ExecutePlanResponse does not support this is IMO a bit flimsy. Look for example at the `SqlCommandResult` this can be returned as part of the `ExecutePlanResponse` and it returns a relation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]
hvanhovell closed pull request #46638: [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests URL: https://github.com/apache/spark/pull/46638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
pan3793 commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: I was consider to port the code as-is(also keeping package and class name) as we did in the `thriftserver` module for `hive-service`, while it would cause class conflicts because `hadoop-client-minicluster`(present in test scope) ships those classes too. Meanwhile, I found an existing `YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop classes. So I rename the class to `YarnAMIpFilter`, rewrite it in Scala, and put it alongside `YarnProxyRedirectFilter`. UT `YarnAMIpFilterSuite` is ported too to ensure the correctness of rewriting. @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to Java classes, and reuse the Hadoop code as much as possible. For example, use `org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in `sendRedirect` instead of writing by hand as we did in `YarnProxyRedirectFilter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
pan3793 commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: I was consider to port the code as-is(also keeping package and class name) as we did in the `thriftserver` module for `hive-service`, while it would cause class conflicts because `hadoop-client-minicluster`(present in test scope) ships those classes too. Meanwhile, I found an existing `YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop classes. So I rename the class to `YarnAMIpFilter`, rewrote it in Scala, and put it alongside `YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is ported too to ensure the correctness of rewriting. @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to Java classes, and reuse the Hadoop code as much as possible. For example, use `org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in `sendRedirect` instead of writing by hand as we did in `YarnProxyRedirectFilter`. ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: I was consider to port the code as-is(also keeping package and class name) as we did in the `thriftserver` module for `hive-service`, while it would cause class conflicts because `hadoop-client-minicluster`(present in test scope) ships those classes too. Meanwhile, I found an existing `YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop classes. So I rename the class to `YarnAMIpFilter`, rewrite it in Scala, and put it alongside `YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is ported too to ensure the correctness of rewriting. @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to Java
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
pan3793 commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: I was consider to port the code as-is(also keeping package and class name) as we did in the `thriftserver` module for `hive-service`, while it would cause class conflicts because `hadoop-client-minicluster`(present in test scope) ships those classes too. Meanwhile, I found an existing `YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop classes. So I rename the class to `YarnAMIpFilter`, and put it alongside `YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is ported too to ensure the correctness of rewriting. @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to Java classes, and reuse the Hadoop code as much as possible. For example, use `org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in `sendRedirect` instead of writing by hand as we did in `YarnProxyRedirectFilter`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
mkaravel commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1605449462 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -118,76 +119,433 @@ public Collation( } /** - * Constructor with comparators that are inherited from the given collator. + * Collation id is defined as 32-bit integer. + * We specify binary layouts for different classes of collations. + * Classes of collations are differentiated by most significant 3 bits (bit 31, 30 and 29), + * bit 31 being most significant and bit 0 being least significant. + * --- + * INDETERMINATE collation id binary layout: + * bit 31-0: 1 + * INDETERMINATE collation id is equal to -1 + * --- + * user-defined collation id binary layout: + * bit 31: 0 + * bit 30: 1 + * bit 29-0: undefined, reserved for future use + * --- + * UTF8_BINARY collation id binary layout: + * bit 31-22: zeroes + * bit 21-18: zeroes, reserved for space trimming + * bit 17-16: zeroes, reserved for version + * bit 15-3: zeroes + * bit 2: 0, reserved for accent sensitivity + * bit 1: 0, reserved for uppercase and case-insensitive + * bit 0: 0 = case-sensitive, 1 = lowercase + * --- + * ICU collation id binary layout: + * bit 31-30: zeroes + * bit 29:1 + * bit 28-24: zeroes + * bit 23-22: zeroes, reserved for version Review Comment: Is there a good reason why version appears at different positions for the UTF8_BINARY family and ICU collations? ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -118,76 +119,433 @@ public Collation( } /** - * Constructor with comparators that are inherited from the given collator. + * Collation id is defined as 32-bit integer. + * We specify binary layouts for different classes of collations. + * Classes of collations are differentiated by most significant 3 bits (bit 31, 30 and 29), + * bit 31 being most significant and bit 0 being least significant. + * --- + * INDETERMINATE collation id binary layout: + * bit 31-0: 1 + * INDETERMINATE collation id is equal to -1 + * --- + * user-defined collation id binary layout: + * bit 31: 0 + * bit 30: 1 + * bit 29-0: undefined, reserved for future use + * --- + * UTF8_BINARY collation id binary layout: + * bit 31-22: zeroes + * bit 21-18: zeroes, reserved for space trimming + * bit 17-16: zeroes, reserved for version + * bit 15-3: zeroes + * bit 2: 0, reserved for accent sensitivity + * bit 1: 0, reserved for uppercase and case-insensitive + * bit 0: 0 = case-sensitive, 1 = lowercase + * --- + * ICU collation id binary layout: + * bit 31-30: zeroes + * bit 29:1 + * bit 28-24: zeroes + * bit 23-22: zeroes, reserved for version + * bit 21-18: zeroes, reserved for space trimming + * bit 17:0 = case-sensitive, 1 = case-insensitive + * bit 16:0 = accent-sensitive, 1 = accent-insensitive + * bit 15-14: zeroes, reserved for punctuation sensitivity + * bit 13-12: zeroes, reserved for first letter preference + * bit 11-0: locale id as specified in `ICULocaleToId` mapping + * --- + * Some illustrative examples of collation name to id mapping: + * - UTF8_BINARY -> 0 + * - UTF8_BINARY_LCASE -> 1 + * - UNICODE -> 0x2000 + * - UNICODE_AI-> 0x2001 + * - UNICODE_CI-> 0x2002 + * - UNICODE_CI_AI -> 0x2003 + * - af-> 0x2001 + * - af_CI_AI -> 0x20030001 */ -public Collation( -String collationName, -Collator collator, -String version, -boolean supportsBinaryEquality, -boolean supportsBinaryOrdering, -boolean supportsLowercaseEquality) { - this( -collationName, -collator, -(s1, s2) -> collator.compare(s1.toString(), s2.toString()), -version, -s -> (long)collator.getCollationKey(s.toString()).hashCode(), -supportsBinaryEquality, -supportsBinaryOrdering, -supportsLowercaseEquality); +private abstract static class CollationSpec { + + private enum DefinitionOrigin { Review Comment: I do not want to block this PR for this, but I find the lack of comments for variables, inner classes, methods, etc. quite problematic. The only way to understand what is going on is by looking at the code, which could be okay in some cases, but it definitely does not help in understanding how the different pieces fit together. ## common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala: ## @@ -152,4 +219,218 @@ class CollationFactorySuite extends AnyFunSuite with
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
pan3793 commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605458503 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: This is a functional equivalent rewrite. Simplify the code by converting to Scala and merging code from several Hadoop Java classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
mkaravel commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r160575 ## connector/connect/common/src/main/protobuf/spark/connect/types.proto: ## @@ -101,7 +101,7 @@ message DataType { message String { uint32 type_variation_reference = 1; -uint32 collation_id = 2; +string collation = 2; Review Comment: It took me a bit, but I get it now. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] OperatorStateMetadata [spark]
ericm-db opened a new pull request, #46645: URL: https://github.com/apache/spark/pull/46645 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] [spark]
ericm-db opened a new pull request, #46644: URL: https://github.com/apache/spark/pull/46644 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]
michaelzhan-db opened a new pull request, #46643: URL: https://github.com/apache/spark/pull/46643 ### What changes were proposed in this pull request? Backport of #46623. Handle lowercase values inside of nestTypeMissingElementTypeError to prevent match errors. ### Why are the changes needed? The previous match error was not user-friendly. Now it gives an actionable `INCOMPLETE_TYPE_DEFINITION` error. ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? Newly added tests pass. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
uros-db commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1605139359 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -74,16 +80,28 @@ case class Mode( if (buffer.isEmpty) { return null } - +val collationAwareBuffer = + if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { +val modeMap = buffer.toSeq.groupMapReduce { + case (key: String, _) => +CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => +CollationFactory.getCollationKey(key, collationId) + case (key, _) => key +}(x => x)((x, y) => (x._1, x._2 + y._2)).values +modeMap + } else { +buffer + } Review Comment: also, you may be able to use just: `CollationFactory.getCollationKey(key.asInstanceOf[UTF8String], st.collationId)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
uros-db commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1605136854 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -74,16 +80,28 @@ case class Mode( if (buffer.isEmpty) { return null } - +val collationAwareBuffer = + if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { +val modeMap = buffer.toSeq.groupMapReduce { + case (key: String, _) => +CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => +CollationFactory.getCollationKey(key, collationId) + case (key, _) => key +}(x => x)((x, y) => (x._1, x._2 + y._2)).values +modeMap + } else { +buffer + } Review Comment: ```suggestion val collationAwareBuffer = child.dataType match { case st: StringType => val modeMap = buffer.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), st.collationId) case (key: UTF8String, _) => CollationFactory.getCollationKey(key, st.collationId) case (key, _) => key }(x => x)((x, y) => (x._1, x._2 + y._2)).values modeMap case _ => buffer } ``` (something like this) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
uros-db commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1605136854 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -74,16 +80,28 @@ case class Mode( if (buffer.isEmpty) { return null } - +val collationAwareBuffer = + if (!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) { +val modeMap = buffer.toSeq.groupMapReduce { + case (key: String, _) => +CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) + case (key: UTF8String, _) => +CollationFactory.getCollationKey(key, collationId) + case (key, _) => key +}(x => x)((x, y) => (x._1, x._2 + y._2)).values +modeMap + } else { +buffer + } Review Comment: ```suggestion val collationAwareBuffer = child.dataType match { case st: StringType => val modeMap = buffer.toSeq.groupMapReduce { case (key: String, _) => CollationFactory.getCollationKey(UTF8String.fromString(key), collationId) case (key: UTF8String, _) => CollationFactory.getCollationKey(key, collationId) case (key, _) => key }(x => x)((x, y) => (x._1, x._2 + y._2)).values modeMap case _ => buffer } ``` (something like this) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-47227][FOLLOW][DOCS] Building Extensions [spark]
nchammas commented on code in PR #45340: URL: https://github.com/apache/spark/pull/45340#discussion_r1605122743 ## docs/spark-connect-extending.md: ## @@ -0,0 +1,248 @@ +--- +layout: global +title: Extending Spark Connect with Custom Functionality +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +Apache Spark provides different ways for users and developers to extend the +system with custom functionality using many different extension points. Due to +the separation of the client from the server, some of the extensions mechanism Review Comment: Typo: extension[] mechanism[s] ## docs/spark-connect-extending.md: ## @@ -0,0 +1,248 @@ +--- +layout: global +title: Extending Spark Connect with Custom Functionality +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +Apache Spark provides different ways for users and developers to extend the +system with custom functionality using many different extension points. Due to +the separation of the client from the server, some of the extensions mechanism +are changing. + +This can be separated into two main categories: First, extensions that are +purely client side and do not extend the way the Spark server is behaving. +Second, extensions that interact directly with Spark and need to run directly on +the driver. + + +## Client Extensions + +- Client extensions are programs that operate purely on the client surface of + Apache Spark and do not require any server side changes. + + +## Server Extensions + +One of the biggest benefits of Spark Connect is the ability to extend Apache +Spark with extensions and then use them seamlessly in all clients from any +programming language with relative minimal effort. + +Building an extension for Spark Connect is an approach aimed at enhancing the +functionality and flexibility of Apache Spark. Spark Connect operates on three +core primitives: relations, expressions, and commands, each serving a unique +purpose within the data processing framework. + +Relations in Spark Connect are fundamental to dataset transformations, acting as +the mechanism through which an optional input dataset is transformed into an +output dataset. Conceptually, relations can be likened to tables within a +database, manipulated to achieve desired outcomes. Their functionality closely +mirrors that of the DataFrame API, providing a familiar and intuitive interface +for data manipulation. + +Expressions form another critical component of Spark Connect, functioning as +operations that can be applied to individual columns or a set of columns within +a dataset. These expressions are akin to functions in programming, offering a +level of granularity in data processing that is comparable to the operations +available through the Column API. This allows users to easily extend Spark with +custom expression functionality and make it available to all clients. + +Commands stand out within Spark Connect as distinct actions that can be +executed. Unlike relations, which focus on the transformation and nesting of +output data, commands represent singular operations that perform specific tasks +on the data. An example of such a command is a Data Manipulation Language (DML) Review Comment: How is DML different from a DataFrame transformation / relation? You didn't happen to mean Data Definition Language (DDL) instead, did you? e.g. `ALTER TABLE` ## docs/spark-connect-extending.md: ## @@ -0,0 +1,248 @@ +---
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
uros-db commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1605132882 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -41,6 +42,11 @@ case class Mode( this(child, 0, 0, Some(reverse)) } + final lazy val collationId: Int = child.dataType match { +case c: StringType => c.collationId +case _ => CollationFactory.UTF8_BINARY_COLLATION_ID + } Review Comment: I see, child is just Expression in Mode in this case, I would say remove final lazy val collationId (since it's only used in 1 place anyways), and just do pattern matching below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
cloud-fan commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605076750 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: If the original code is Java, I'd prefer we also add a Java file in Spark, so that it's easier to keep the code in sync. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
cloud-fan commented on PR #46611: URL: https://github.com/apache/spark/pull/46611#issuecomment-2117700736 This looks like a good idea to get rid of the conflicting dependency. We should clarify https://github.com/apache/spark/pull/46611#discussion_r1605001322 though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] JDBC Connectors predicate pushdown testing [spark]
stefanbuk-db opened a new pull request, #46642: URL: https://github.com/apache/spark/pull/46642 ### What changes were proposed in this pull request? In this PR, I add a new trait with tests for integration testing of JDBC connectors. Also, I propose changes to `MsSqlServerDialect` to support more filter push downs. ### Why are the changes needed? These changes are needed for better testing of JDBC connectors and general improvements of push down capabilities. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? With added tests. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on PR #46597: URL: https://github.com/apache/spark/pull/46597#issuecomment-2117660366 @uros-db This is all cleaned up. Let's get some of the other reviewers to look at it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48210][DOC]Modify the description of whether dynamic partition… [spark]
tgravescs commented on PR #46496: URL: https://github.com/apache/spark/pull/46496#issuecomment-2117624849 what version of Spark are you using? That issue I pointed to was fixed in 4.0.0 and 3.5.1. The check and error that is on the main branch should read: ``` if ((notRunningUnitTests || testExceptionThrown) && !(isStandaloneOrLocalCluster || isYarn || isK8s)) { throw new SparkException("TaskResourceProfiles are only supported for Standalone, " + "Yarn and Kubernetes cluster for now when dynamic allocation is disabled.") ``` https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala#L73 I'm guessing you are using a version of Spark before that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
tgravescs commented on code in PR #46611: URL: https://github.com/apache/spark/pull/46611#discussion_r1605001322 ## resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.io.IOException +import java.net.{HttpURLConnection, InetAddress, MalformedURLException, UnknownHostException, URL} +import java.security.Principal +import java.util.concurrent.TimeUnit + +import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse} +import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, HttpServletResponse} +import org.apache.commons.lang3.StringUtils +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.internal.Logging + +// This class is inspired by org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter Review Comment: can you clarify this, is it just copy converted to scala or did you add/change functionality? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]
vicennial commented on PR #46638: URL: https://github.com/apache/spark/pull/46638#issuecomment-2117599067 cc @hvanhovell -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SQL] Enable hash aggregation support for all collations (StringType) [spark]
uros-db opened a new pull request, #46640: URL: https://github.com/apache/spark/pull/46640 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604955280 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1404,6 +1408,81 @@ class CollationSQLExpressionsSuite }) } + test("Support mode for string expression with collation - Basic Test") { +Seq("utf8_binary", "utf8_binary_lcase", "unicode_ci", "unicode").foreach { collationId => + val query = s"SELECT mode(collate('abc', '${collationId}'))" + checkAnswer(sql(query), Row("abc")) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(collationId))) +} + } + + test("Support mode for string expression with collation - Advanced Test") { +case class ModeTestCase[R](collationId: String, bufferValues: Map[String, Long], result: R) +val testCases = Seq( + ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"), + ModeTestCase("utf8_binary_lcase", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"), + ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"), + ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a") +) +testCases.foreach(t => { + val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) => +(0L to numRepeats).map(_ => s"('$elt')").mkString(",") + }.mkString(",") + + withTable("t") { +sql("CREATE TABLE t(i STRING) USING parquet") +sql("INSERT INTO t VALUES " + valuesToAdd) +val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t" +checkAnswer(sql(query), Row(t.result)) + assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId))) + + } +}) + } + + test("Support Mode.eval(buffer)") { +case class ModeTestCase[R]( +collationId: String, +bufferValues: Map[String, Long], +result: R) +case class UTF8StringModeTestCase[R]( +collationId: String, +bufferValues: Map[UTF8String, Long], +result: R) + +val bufferValues = Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 1L) +val testCasesStrings = Seq(ModeTestCase("utf8_binary", bufferValues, "a"), + ModeTestCase("utf8_binary_lcase", bufferValues, "b"), + ModeTestCase("unicode_ci", bufferValues, "b"), + ModeTestCase("unicode", bufferValues, "a")) + +val bufferValuesUTF8String = Map( + UTF8String.fromString("a") -> 5L, + UTF8String.fromString("b") -> 4L, + UTF8String.fromString("B") -> 3L, + UTF8String.fromString("d") -> 2L, + UTF8String.fromString("e") -> 1L) + +val testCasesUTF8String = Seq( + UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"), + UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"), + UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a")) + +testCasesStrings.foreach(t => { + val buffer = new OpenHashMap[AnyRef, Long](5) + val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } + assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) +}) + +testCasesUTF8String.foreach(t => { + val buffer = new OpenHashMap[AnyRef, Long](5) + val myMode = Mode(child = Literal.create("some_column_name", StringType(t.collationId))) + t.bufferValues.foreach { case (k, v) => buffer.update(k, v) } + assert(myMode.eval(buffer).toString.toLowerCase() == t.result.toLowerCase()) +}) + } Review Comment: I would love to add some E2E Benchmark tests. But it is okay if not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604955632 ## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala: ## @@ -185,6 +189,32 @@ abstract class CollationBenchmarkBase extends BenchmarkBase { } benchmark.run() } + + def benchmarkMode( Review Comment: I would love to add some E2E Benchmark tests. But it is okay if not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604951157 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE2896 2898 3 0.0 28958.7 1.0X +UNICODE 2038 2040 3 0.0 20377.5 1.4X +UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X +UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X +UNICODE 18863 18867 6 0.0 188625.3 0.2X +UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X +UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X +UNICODE 68309 68319 13 0.0 683094.7 0.1X +UTF8_BINARY3887 3887 1 0.0 38869.8 1.3X +UNICODE_CI56675 56686 15 0.0 566750.3 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604935916 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE2896 2898 3 0.0 28958.7 1.0X +UNICODE 2038 2040 3 0.0 20377.5 1.4X +UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X +UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X +UNICODE 18863 18867 6 0.0 188625.3 0.2X +UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X +UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X +UNICODE 68309 68319 13 0.0 683094.7 0.1X +UTF8_BINARY3887 3887 1 0.0 38869.8 1.3X +UNICODE_CI56675 56686 15 0.0 566750.3 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604945722 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE2896 2898 3 0.0 28958.7 1.0X +UNICODE 2038 2040 3 0.0 20377.5 1.4X +UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X +UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X +UNICODE 18863 18867 6 0.0 188625.3 0.2X +UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X +UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X +UNICODE 68309 68319 13 0.0 683094.7 0.1X +UTF8_BINARY3887 3887 1 0.0 38869.8 1.3X +UNICODE_CI56675 56686 15 0.0 566750.3 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604944495 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE2896 2898 3 0.0 28958.7 1.0X +UNICODE 2038 2040 3 0.0 20377.5 1.4X +UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X +UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X +UNICODE 18863 18867 6 0.0 188625.3 0.2X +UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X +UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X +UNICODE 68309 68319 13 0.0 683094.7 0.1X +UTF8_BINARY3887 3887 1 0.0 38869.8 1.3X +UNICODE_CI56675 56686 15 0.0 566750.3 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604942334 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -41,6 +42,11 @@ case class Mode( this(child, 0, 0, Some(reverse)) } + final lazy val collationId: Int = child.dataType match { Review Comment: if we use `asInstanceOf[StringType]` we will get an exception whenever the child is a numerical type or any non-string type. It would be ideal in the case of a non-string type if collationId could be -1 or something, but then CollationFactory.fetchCollation(collationId) would throw an exception. But it really is not precise to be relying on `!CollationFactory.fetchCollation(CollationFactory.UTF8_BINARY_COLLATION_ID).supportsBinaryEquality` when it is a non-string-type to continue with the non-collation logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604934461 ## sql/core/benchmarks/CollationBenchmark-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE3571 3576 7 0.0 35708.8 1.0X -UNICODE 2235 2240 7 0.0 22349.2 1.6X -UTF8_BINARY 2237 2242 6 0.0 22371.7 1.6X -UNICODE_CI 18733 18817 118 0.0 187333.8 0.2X +UTF8_BINARY_LCASE3241 3252 16 0.0 32413.8 1.0X +UNICODE 2080 2082 3 0.0 20800.9 1.6X +UTF8_BINARY 2081 2083 2 0.0 20814.2 1.6X +UNICODE_CI 17364 17384 27 0.0 173644.2 0.2X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 4260 4290 41 0.0 42602.6 1.0X -UNICODE 19536 19624 124 0.0 195360.2 0.2X -UTF8_BINARY 3582 3612 43 0.0 35818.5 1.2X -UNICODE_CI 20381 20454 103 0.0 203814.1 0.2X +UTF8_BINARY_LCASE 3614 3615 1 0.0 36142.6 1.0X +UNICODE 18575 18585 15 0.0 185747.7 0.2X +UTF8_BINARY 3311 3326 21 0.0 33111.6 1.1X +UNICODE_CI 19241 19249 11 0.0 192409.4 0.2X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 7347 7349 3 0.0 73467.1 1.0X -UNICODE 73462 73608 206 0.0 734623.2 0.1X -UTF8_BINARY5775 5815 57 0.0 57746.0 1.3X -UNICODE_CI57543 57619 108 0.0 575425.2 0.1X +UTF8_BINARY_LCASE 6928 6929 1 0.0 69276.9 1.0X +UNICODE 65674 65693 27 0.0 656737.6 0.1X +UTF8_BINARY5440 5457 23 0.0 54403.2 1.3X +UNICODE_CI60549 60605 79 0.0 605488.5 0.1X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 15415 15424 13 0.0 154147.1 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604935916 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE2948 2958 13 0.0 29483.6 1.0X -UNICODE 2040 2042 3 0.0 20396.6 1.4X -UTF8_BINARY 2043 2043 0 0.0 20426.3 1.4X -UNICODE_CI 16318 16338 28 0.0 163178.4 0.2X +UTF8_BINARY_LCASE2896 2898 3 0.0 28958.7 1.0X +UNICODE 2038 2040 3 0.0 20377.5 1.4X +UTF8_BINARY 2053 2054 1 0.0 20534.9 1.4X +UNICODE_CI 16779 16802 34 0.0 167785.2 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 3227 3228 1 0.0 32272.1 1.0X -UNICODE 16637 16643 9 0.0 166367.7 0.2X -UTF8_BINARY 3132 3137 7 0.0 31319.2 1.0X -UNICODE_CI 17816 17829 18 0.0 178162.4 0.2X +UTF8_BINARY_LCASE 4705 4705 0 0.0 47048.0 1.0X +UNICODE 18863 18867 6 0.0 188625.3 0.2X +UTF8_BINARY 4894 4901 11 0.0 48936.8 1.0X +UNICODE_CI 19595 19598 4 0.0 195953.0 0.2X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 4824 4824 0 0.0 48243.7 1.0X -UNICODE 69416 69475 84 0.0 694158.3 0.1X -UTF8_BINARY3806 3808 2 0.0 38062.8 1.3X -UNICODE_CI60943 60975 45 0.0 609426.2 0.1X +UTF8_BINARY_LCASE 5011 5013 2 0.0 50113.1 1.0X +UNICODE 68309 68319 13 0.0 683094.7 0.1X +UTF8_BINARY3887 3887 1 0.0 38869.8 1.3X +UNICODE_CI56675 56686 15 0.0 566750.3 0.1X -OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 11979 11980 1 0.0 119790.4 1.0X -UNICODE
Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]
GideonPotok commented on code in PR #46597: URL: https://github.com/apache/spark/pull/46597#discussion_r1604934461 ## sql/core/benchmarks/CollationBenchmark-results.txt: ## @@ -1,54 +1,63 @@ -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -UTF8_BINARY_LCASE3571 3576 7 0.0 35708.8 1.0X -UNICODE 2235 2240 7 0.0 22349.2 1.6X -UTF8_BINARY 2237 2242 6 0.0 22371.7 1.6X -UNICODE_CI 18733 18817 118 0.0 187333.8 0.2X +UTF8_BINARY_LCASE3241 3252 16 0.0 32413.8 1.0X +UNICODE 2080 2082 3 0.0 20800.9 1.6X +UTF8_BINARY 2081 2083 2 0.0 20814.2 1.6X +UNICODE_CI 17364 17384 27 0.0 173644.2 0.2X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -UTF8_BINARY_LCASE 4260 4290 41 0.0 42602.6 1.0X -UNICODE 19536 19624 124 0.0 195360.2 0.2X -UTF8_BINARY 3582 3612 43 0.0 35818.5 1.2X -UNICODE_CI 20381 20454 103 0.0 203814.1 0.2X +UTF8_BINARY_LCASE 3614 3615 1 0.0 36142.6 1.0X +UNICODE 18575 18585 15 0.0 185747.7 0.2X +UTF8_BINARY 3311 3326 21 0.0 33111.6 1.1X +UNICODE_CI 19241 19249 11 0.0 192409.4 0.2X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 7347 7349 3 0.0 73467.1 1.0X -UNICODE 73462 73608 206 0.0 734623.2 0.1X -UTF8_BINARY5775 5815 57 0.0 57746.0 1.3X -UNICODE_CI57543 57619 108 0.0 575425.2 0.1X +UTF8_BINARY_LCASE 6928 6929 1 0.0 69276.9 1.0X +UNICODE 65674 65693 27 0.0 656737.6 0.1X +UTF8_BINARY5440 5457 23 0.0 54403.2 1.3X +UNICODE_CI60549 60605 79 0.0 605488.5 0.1X -OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure +OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure AMD EPYC 7763 64-Core Processor collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -UTF8_BINARY_LCASE 15415 15424 13 0.0 154147.1 1.0X -UNICODE
Re: [PR] [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance [spark]
cloud-fan closed pull request #46622: [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance URL: https://github.com/apache/spark/pull/46622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance [spark]
cloud-fan commented on PR #46622: URL: https://github.com/apache/spark/pull/46622#issuecomment-2117503274 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
dbatomic commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604917892 ## connector/connect/common/src/main/protobuf/spark/connect/types.proto: ## @@ -101,7 +101,7 @@ message DataType { message String { uint32 type_variation_reference = 1; -uint32 collation_id = 2; +string collation = 2; Review Comment: And it makes sense. Collation id should be internal spark concept. Outside of spark you should always use collation name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
dbatomic commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604916810 ## common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala: ## @@ -152,4 +231,168 @@ class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ig } }) } + + test("test collation caching") { +Seq( + "UTF8_BINARY", + "UTF8_BINARY_LCASE", + "UTF8_BINARY_UCASE", + "UNICODE", + "UNICODE_LCASE", + "UNICODE_UCASE", + "UNICODE_CI", + "UNICODE_AI_CI", + "UNICODE_AI_CI_LCASE", + "UNICODE_AI_CI_UCASE" +).foreach(collationId => { + val col1 = fetchCollation(collationId) + val col2 = fetchCollation(collationId) + assert(col1 eq col2) // reference equality +}) + } + + test("collations with ICU non-root localization") { +Seq( + // language only + "en", + "en_CS", + "en_CI", + "en_AS", + "en_AI", + "en_LCASE", + "en_UCASE", + // language + 3-letter country code + "en_USA", + "en_USA_CS", + "en_USA_CI", + "en_USA_AS", + "en_USA_AI", + "en_USA_LCASE", + "en_USA_UCASE", + // language + script code + "sr_Cyrl", + "sr_Cyrl_CS", + "sr_Cyrl_CI", + "sr_Cyrl_AS", + "sr_Cyrl_AI", + "sr_Cyrl_LCASE", + "sr_Cyrl_UCASE", + // language + script code + 3-letter country code + "sr_Cyrl_SRB", + "sr_Cyrl_SRB_CS", + "sr_Cyrl_SRB_CI", + "sr_Cyrl_SRB_AS", + "sr_Cyrl_SRB_AI", + "sr_Cyrl_SRB_LCASE", + "sr_Cyrl_SRB_UCASE" +).foreach(collationICU => { + val col = fetchCollation(collationICU) + assert(col.collator.getLocale(ULocale.VALID_LOCALE) != ULocale.ROOT) +}) + } + + test("invalid names of collations with ICU non-root localization") { +Seq( + "en_US", // must use 3-letter country code + "enn", + "en_AAA", + "en_Something", + "en_Something_USA", + "en_CI_UNSPECIFIED", + "en_USA_UNSPECIFIED", + "en_USA_UNSPECIFIED_CI", + "en_INDETERMINATE", + "en_USA_INDETERMINATE", + "en_Latn_USA", // use en_USA instead + "en_Cyrl_USA", + "en_USA_AAA", + "sr_Cyrl_SRB_AAA" +).foreach(collationName => { + val error = intercept[SparkException] { +fetchCollation(collationName) + } + + assert(error.getErrorClass === "COLLATION_INVALID_NAME") + assert(error.getMessageParameters.asScala === Map("collationName" -> collationName)) +}) + } + + test("collations name normalization for ICU non-root localization") { +Seq( + ("en_USA", "en_USA"), + ("en_CS", "en"), + ("en_AS", "en"), + ("en_CS_AS", "en"), + ("en_AS_CS", "en"), + ("en_CI", "en_CI"), + ("en_AI", "en_AI"), + ("en_AI_CI", "en_CI_AI"), + ("en_USA_AI_CI", "en_USA_CI_AI"), + ("en_USA_LCASE_AI_CI", "en_USA_CI_AI_LCASE"), + ("en_USA_LCASE_CI_AI", "en_USA_CI_AI_LCASE"), + ("en_USA_AI_LCASE_CI", "en_USA_CI_AI_LCASE"), + ("en_USA_CI_LCASE_AI", "en_USA_CI_AI_LCASE"), + // randomized case + ("EN_USA", "en_USA"), + ("eN_usA_ci_uCASe_aI", "en_USA_CI_AI_UCASE"), + ("SR_CYRL", "sr_Cyrl"), + ("sr_cyrl_srb", "sr_Cyrl_SRB"), + ("sR_cYRl_sRb", "sr_Cyrl_SRB") +).foreach { + case (name, normalized) => +val col = fetchCollation(name) +assert(col.collationName == normalized) +} + } + + test("invalid collationId") { +val badCollationIds = Seq( + -1, // user-defined collation range + 1 << 31, // user-defined collation range + 1 << 12, // utf8-binary mandatory zero bit 12 breach + 1 << 13, // utf8-binary mandatory zero bit 13 breach + 1 << 14, // utf8-binary mandatory zero bit 14 breach + 1 << 15, // utf8-binary mandatory zero bit 15 breach + 1 << 16, // utf8-binary mandatory zero bit 16 breach + 1 << 17, // utf8-binary mandatory zero bit 17 breach + 1 << 18, // utf8-binary mandatory zero bit 18 breach + 1 << 19, // utf8-binary mandatory zero bit 19 breach + 1 << 20, // utf8-binary mandatory zero bit 20 breach + 1 << 23, // utf8-binary mandatory zero bit 23 breach + 1 << 24, // utf8-binary mandatory zero bit 24 breach + 1 << 25, // utf8-binary mandatory zero bit 25 breach + 1 << 26, // utf8-binary mandatory zero bit 26 breach + (1 << 29) | (1 << 12), // ICU mandatory zero bit 12 breach + (1 << 29) | (1 << 13), // ICU mandatory zero bit 13 breach + (1 << 29) | (1 << 14), // ICU mandatory zero bit 14 breach + (1 << 29) | (1 << 15), // ICU mandatory zero bit 15 breach + (1 << 29) | (1 << 16), // ICU mandatory zero bit 16 breach + (1 << 29) | (1 << 17), // ICU mandatory zero bit 17 breach + (1 << 29) | (1 << 18), // ICU mandatory zero bit 18 breach + (1 << 29) | (1 << 19), // ICU mandatory zero bit 19 breach + (1 << 29) | (1
[PR] [SPARK-48324][SQL] Codegen Support for `hll_sketch_estimate` and `hll_union` [spark]
panbingkun opened a new pull request, #46639: URL: https://github.com/apache/spark/pull/46639 ### What changes were proposed in this pull request? The PR adds `Codegen Support` for `hll_sketch_estimate` and `hll_union`. ### Why are the changes needed? Improve codegen coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Add new UT. - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]
stefankandic commented on PR #46280: URL: https://github.com/apache/spark/pull/46280#issuecomment-2117427722 @cloud-fan all checks passing, can we merge this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]
xi-db opened a new pull request, #46638: URL: https://github.com/apache/spark/pull/46638 ### What changes were proposed in this pull request? In [this previous PR](https://github.com/apache/spark/pull/46012), we introduced two new confs for the introduced plan cache - a static conf `spark.connect.session.planCache.maxSize` and a dynamic conf `spark.connect.session.planCache.enabled`. The plan cache is enabled by default with size 5. In this PR, we are marking them as internal because we don't expect users to deal with it. ### Why are the changes needed? This two confs are not expected to be used under normal circumstances, and we don't need to document them on the Spark Configuration reference page. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]
pan3793 commented on PR #46611: URL: https://github.com/apache/spark/pull/46611#issuecomment-2117258617 cc @cloud-fan @dongjoon-hyun @LuciferYang @srowen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48323][SQL] DB2: Map BooleanType to BOOLEAN instead of CHAR(1) [spark]
yaooqinn opened a new pull request, #46637: URL: https://github.com/apache/spark/pull/46637 ### What changes were proposed in this pull request? This PR maps BooleanType to BOOLEAN instead of CHAR(1) when writing DB2 tables, users can restore the old behavior by setting spark.sql.legacy.db2.booleanTypeMapping.enabled to true ### Why are the changes needed? DB2 has supported boolean since v9.7, which is already EOL. It's reasonable to BooleanType to BOOLEAN ### Does this PR introduce _any_ user-facing change? yes, spark.sql.legacy.db2.booleanTypeMapping.enabled is provided to restore ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl [spark]
zhengruifeng commented on PR #46635: URL: https://github.com/apache/spark/pull/46635#issuecomment-2117212987 merged to master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl [spark]
zhengruifeng closed pull request #46635: [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl URL: https://github.com/apache/spark/pull/46635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
mihailom-db commented on PR #46474: URL: https://github.com/apache/spark/pull/46474#issuecomment-2117197531 So in summary SQL will be fine, but we need to resolve dataframe API as well and think of the best unified solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
nikolamand-db commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604679971 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -117,76 +119,445 @@ public Collation( } /** - * Constructor with comparators that are inherited from the given collator. + * collation id (32-bit integer) layout: + * bit 31:0 = predefined collation, 1 = user-defined collation + * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec implementation) + * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive for ICU + * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = accent-insensitive for ICU + * bit 26-25: zeroes, reserved for punctuation sensitivity + * bit 24-23: zeroes, reserved for first letter preference + * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper + * bit 20-19: zeroes, reserved for space trimming + * bit 18-17: zeroes, reserved for version + * bit 16-12: zeroes + * bit 11-0: zeroes for utf8-binary / locale id for ICU */ -public Collation( -String collationName, -Collator collator, -String version, -boolean supportsBinaryEquality, -boolean supportsBinaryOrdering, -boolean supportsLowercaseEquality) { - this( -collationName, -collator, -(s1, s2) -> collator.compare(s1.toString(), s2.toString()), -version, -s -> (long)collator.getCollationKey(s.toString()).hashCode(), -supportsBinaryEquality, -supportsBinaryOrdering, -supportsLowercaseEquality); +private abstract static class CollationSpec { + protected enum ImplementationProvider { +UTF8_BINARY, ICU, INDETERMINATE + } + + protected enum CaseSensitivity { +CS, CI + } + + protected enum AccentSensitivity { +AS, AI + } + + protected enum CaseConversion { +UNSPECIFIED, LCASE, UCASE + } + + protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29; + protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11; + protected static final int CASE_SENSITIVITY_OFFSET = 28; + protected static final int CASE_SENSITIVITY_MASK = 0b1; + protected static final int ACCENT_SENSITIVITY_OFFSET = 27; + protected static final int ACCENT_SENSITIVITY_MASK = 0b1; + protected static final int CASE_CONVERSION_OFFSET = 21; + protected static final int CASE_CONVERSION_MASK = 0b11; + protected static final int LOCALE_OFFSET = 0; + protected static final int LOCALE_MASK = 0x0FFF; + + protected static final int INDETERMINATE_COLLATION_ID = +ImplementationProvider.INDETERMINATE.ordinal() << IMPLEMENTATION_PROVIDER_OFFSET; + + protected final CaseSensitivity caseSensitivity; + protected final AccentSensitivity accentSensitivity; + protected final CaseConversion caseConversion; + protected final String locale; + protected final int collationId; + + protected CollationSpec( + String locale, + CaseSensitivity caseSensitivity, + AccentSensitivity accentSensitivity, + CaseConversion caseConversion) { +this.locale = locale; +this.caseSensitivity = caseSensitivity; +this.accentSensitivity = accentSensitivity; +this.caseConversion = caseConversion; +this.collationId = getCollationId(); + } + + private static final Map collationMap = new ConcurrentHashMap<>(); + + public static Collation fetchCollation(int collationId) throws SparkException { +if (collationId == UTF8_BINARY_COLLATION_ID) { + return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION; +} else if (collationMap.containsKey(collationId)) { + return collationMap.get(collationId); +} else { + CollationSpec spec; + int implementationProviderOrdinal = +(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & IMPLEMENTATION_PROVIDER_MASK; + if (implementationProviderOrdinal >= ImplementationProvider.values().length) { +throw SparkException.internalError("Invalid collation implementation provider"); + } else { +ImplementationProvider implementationProvider = ImplementationProvider.values()[ + implementationProviderOrdinal]; +if (implementationProvider == ImplementationProvider.UTF8_BINARY) { + spec = CollationSpecUTF8Binary.fromCollationId(collationId); +} else if (implementationProvider == ImplementationProvider.ICU) { + spec = CollationSpecICU.fromCollationId(collationId); +} else { + throw SparkException.internalError("Cannot instantiate indeterminate collation"); +} +Collation collation = spec.buildCollation(); +
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
dbatomic commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604678110 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -245,29 +599,26 @@ public static StringSearch getStringSearch( * Returns the collation id for the given collation name. */ public static int collationNameToId(String collationName) throws SparkException { -String normalizedName = collationName.toUpperCase(); -if (collationNameToIdMap.containsKey(normalizedName)) { - return collationNameToIdMap.get(normalizedName); -} else { - Collation suggestion = Collections.min(List.of(collationTable), Comparator.comparingInt( -c -> UTF8String.fromString(c.collationName).levenshteinDistance( - UTF8String.fromString(normalizedName; - - Map params = new HashMap<>(); - params.put("collationName", collationName); - params.put("proposal", suggestion.collationName); - - throw new SparkException( -"COLLATION_INVALID_NAME", SparkException.constructMessageParams(params), null); -} +return Collation.CollationSpec.collationNameToId(collationName); + } + + public static Collation fetchCollationUnsafe(int collationId) throws SparkException { +return Collation.CollationSpec.fetchCollation(collationId); } public static Collation fetchCollation(int collationId) { -return collationTable[collationId]; +try { + return fetchCollationUnsafe(collationId); +} catch (SparkException e) { + return Collation.CollationSpecUTF8Binary.UTF8_BINARY_COLLATION; +} Review Comment: TBH, I don't like this either. Swallowing generic exception and returning default seems like a big red flag to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
mihailom-db commented on PR #46474: URL: https://github.com/apache/spark/pull/46474#issuecomment-2117193264 I would expect that behaviour, but the problem comes with this rewriting you suggested. When we create expressions, CAST expression does not remember identifier used during parsing, but only type that that identifier produced. I managed to block SQL behaviour with the parsing rules, but pyspark, spark connect and dataframe API pose a problem. They allow for casting with types like StringType("collation_name") and StringType() and in python, as opposed to scala, we cannot differentiate StringType() and StringType("UTF8_BINARY") and protobuf always sends information as StringType("UTF8_BINARY"). Even if we only block SQL syntax problem arises if someone uses dataframe api because we can have session-level default collation changed and then collation resolution rules would be impossible to resolve, as that cast would be translated to cast(expression, StringType(session_level_default_collation)) and we have no way of differentiating whether this was created by STRING identifier or STR ING COLLATE session_level_default_collation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
dbatomic commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604672125 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -117,76 +119,445 @@ public Collation( } /** - * Constructor with comparators that are inherited from the given collator. + * collation id (32-bit integer) layout: + * bit 31:0 = predefined collation, 1 = user-defined collation + * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec implementation) + * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive for ICU + * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = accent-insensitive for ICU + * bit 26-25: zeroes, reserved for punctuation sensitivity + * bit 24-23: zeroes, reserved for first letter preference + * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper + * bit 20-19: zeroes, reserved for space trimming + * bit 18-17: zeroes, reserved for version + * bit 16-12: zeroes + * bit 11-0: zeroes for utf8-binary / locale id for ICU */ -public Collation( -String collationName, -Collator collator, -String version, -boolean supportsBinaryEquality, -boolean supportsBinaryOrdering, -boolean supportsLowercaseEquality) { - this( -collationName, -collator, -(s1, s2) -> collator.compare(s1.toString(), s2.toString()), -version, -s -> (long)collator.getCollationKey(s.toString()).hashCode(), -supportsBinaryEquality, -supportsBinaryOrdering, -supportsLowercaseEquality); +private abstract static class CollationSpec { + protected enum ImplementationProvider { +UTF8_BINARY, ICU, INDETERMINATE + } + + protected enum CaseSensitivity { +CS, CI + } + + protected enum AccentSensitivity { +AS, AI + } + + protected enum CaseConversion { +UNSPECIFIED, LCASE, UCASE + } + + protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29; + protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11; + protected static final int CASE_SENSITIVITY_OFFSET = 28; + protected static final int CASE_SENSITIVITY_MASK = 0b1; + protected static final int ACCENT_SENSITIVITY_OFFSET = 27; + protected static final int ACCENT_SENSITIVITY_MASK = 0b1; + protected static final int CASE_CONVERSION_OFFSET = 21; + protected static final int CASE_CONVERSION_MASK = 0b11; + protected static final int LOCALE_OFFSET = 0; + protected static final int LOCALE_MASK = 0x0FFF; + + protected static final int INDETERMINATE_COLLATION_ID = +ImplementationProvider.INDETERMINATE.ordinal() << IMPLEMENTATION_PROVIDER_OFFSET; + + protected final CaseSensitivity caseSensitivity; + protected final AccentSensitivity accentSensitivity; + protected final CaseConversion caseConversion; + protected final String locale; + protected final int collationId; + + protected CollationSpec( + String locale, + CaseSensitivity caseSensitivity, + AccentSensitivity accentSensitivity, + CaseConversion caseConversion) { +this.locale = locale; +this.caseSensitivity = caseSensitivity; +this.accentSensitivity = accentSensitivity; +this.caseConversion = caseConversion; +this.collationId = getCollationId(); + } + + private static final Map collationMap = new ConcurrentHashMap<>(); + + public static Collation fetchCollation(int collationId) throws SparkException { +if (collationId == UTF8_BINARY_COLLATION_ID) { + return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION; +} else if (collationMap.containsKey(collationId)) { + return collationMap.get(collationId); +} else { + CollationSpec spec; + int implementationProviderOrdinal = +(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & IMPLEMENTATION_PROVIDER_MASK; + if (implementationProviderOrdinal >= ImplementationProvider.values().length) { +throw SparkException.internalError("Invalid collation implementation provider"); + } else { +ImplementationProvider implementationProvider = ImplementationProvider.values()[ + implementationProviderOrdinal]; +if (implementationProvider == ImplementationProvider.UTF8_BINARY) { + spec = CollationSpecUTF8Binary.fromCollationId(collationId); +} else if (implementationProvider == ImplementationProvider.ICU) { + spec = CollationSpecICU.fromCollationId(collationId); +} else { + throw SparkException.internalError("Cannot instantiate indeterminate collation"); +} +Collation collation = spec.buildCollation(); +
Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]
dbatomic commented on code in PR #46180: URL: https://github.com/apache/spark/pull/46180#discussion_r1604667645 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -117,76 +119,445 @@ public Collation( } /** - * Constructor with comparators that are inherited from the given collator. + * collation id (32-bit integer) layout: + * bit 31:0 = predefined collation, 1 = user-defined collation + * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec implementation) + * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive for ICU + * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = accent-insensitive for ICU + * bit 26-25: zeroes, reserved for punctuation sensitivity + * bit 24-23: zeroes, reserved for first letter preference + * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper + * bit 20-19: zeroes, reserved for space trimming + * bit 18-17: zeroes, reserved for version + * bit 16-12: zeroes + * bit 11-0: zeroes for utf8-binary / locale id for ICU */ -public Collation( -String collationName, -Collator collator, -String version, -boolean supportsBinaryEquality, -boolean supportsBinaryOrdering, -boolean supportsLowercaseEquality) { - this( -collationName, -collator, -(s1, s2) -> collator.compare(s1.toString(), s2.toString()), -version, -s -> (long)collator.getCollationKey(s.toString()).hashCode(), -supportsBinaryEquality, -supportsBinaryOrdering, -supportsLowercaseEquality); +private abstract static class CollationSpec { + protected enum ImplementationProvider { +UTF8_BINARY, ICU, INDETERMINATE + } + + protected enum CaseSensitivity { +CS, CI + } + + protected enum AccentSensitivity { +AS, AI + } + + protected enum CaseConversion { +UNSPECIFIED, LCASE, UCASE + } + + protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29; + protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11; + protected static final int CASE_SENSITIVITY_OFFSET = 28; + protected static final int CASE_SENSITIVITY_MASK = 0b1; + protected static final int ACCENT_SENSITIVITY_OFFSET = 27; + protected static final int ACCENT_SENSITIVITY_MASK = 0b1; + protected static final int CASE_CONVERSION_OFFSET = 21; + protected static final int CASE_CONVERSION_MASK = 0b11; + protected static final int LOCALE_OFFSET = 0; + protected static final int LOCALE_MASK = 0x0FFF; + + protected static final int INDETERMINATE_COLLATION_ID = +ImplementationProvider.INDETERMINATE.ordinal() << IMPLEMENTATION_PROVIDER_OFFSET; + + protected final CaseSensitivity caseSensitivity; + protected final AccentSensitivity accentSensitivity; + protected final CaseConversion caseConversion; + protected final String locale; + protected final int collationId; + + protected CollationSpec( + String locale, + CaseSensitivity caseSensitivity, + AccentSensitivity accentSensitivity, + CaseConversion caseConversion) { +this.locale = locale; +this.caseSensitivity = caseSensitivity; +this.accentSensitivity = accentSensitivity; +this.caseConversion = caseConversion; +this.collationId = getCollationId(); + } + + private static final Map collationMap = new ConcurrentHashMap<>(); + + public static Collation fetchCollation(int collationId) throws SparkException { +if (collationId == UTF8_BINARY_COLLATION_ID) { + return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION; +} else if (collationMap.containsKey(collationId)) { + return collationMap.get(collationId); +} else { + CollationSpec spec; + int implementationProviderOrdinal = +(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & IMPLEMENTATION_PROVIDER_MASK; + if (implementationProviderOrdinal >= ImplementationProvider.values().length) { +throw SparkException.internalError("Invalid collation implementation provider"); + } else { +ImplementationProvider implementationProvider = ImplementationProvider.values()[ + implementationProviderOrdinal]; +if (implementationProvider == ImplementationProvider.UTF8_BINARY) { + spec = CollationSpecUTF8Binary.fromCollationId(collationId); +} else if (implementationProvider == ImplementationProvider.ICU) { + spec = CollationSpecICU.fromCollationId(collationId); +} else { + throw SparkException.internalError("Cannot instantiate indeterminate collation"); +} +Collation collation = spec.buildCollation(); +
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
srielau commented on PR #46474: URL: https://github.com/apache/spark/pull/46474#issuecomment-2117161101 > @stefankandic SQL Standard does not support for casting to collated strings. I agree with the message thing. The standard only allows for CAST(1 AS STRING) COLLATE UNICODE, so that is what we were trying to achieve with this restriction on cast expression. The problem is that we have Spark Connect which can call col.cast(StringType(collation_name)) or col.cast("STRING COLLATE UNICODE") and we do not have an easy way of differentiating StringType() from type created like StringType(UTF8_BINARY). In order to preserve meaning of cast to STRING as default, I would need to add this flag. Another solution is to always treat user defined casts as implicit priority. @srielau what do you think of this type of behaviour for user defined casts? Let me see if I get this right: If we wanted to support CAST( .. AS STRING COLLATE...) this would naturally result in an collation with IMPLICIT priority. (A CAST (.. AS STRING) would be DEFAULT, right? The alternative would be to (semantically) rewrite it to CAST( .. AS STRING) COLLATE , and treat it as EXPLICIT. Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]
stefankandic commented on code in PR #46280: URL: https://github.com/apache/spark/pull/46280#discussion_r1604650741 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -36,11 +36,45 @@ * Provides functionality to the UTF8String object which respects defined collation settings. */ public final class CollationFactory { + + /** + * Identifier for single a collation. + */ + public static class CollationIdentifier { +public final String provider; +public final String name; +public final String version; Review Comment: that makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]
stefankandic commented on code in PR #46280: URL: https://github.com/apache/spark/pull/46280#discussion_r1604650376 ## common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java: ## @@ -36,11 +36,57 @@ * Provides functionality to the UTF8String object which respects defined collation settings. */ public final class CollationFactory { + + /** + * Identifier for single a collation. + */ + public static class CollationIdentifier { +public final String provider; +public final String name; +public final String version; + +public CollationIdentifier(String provider, String collationName, String version) { + this.provider = provider; + this.name = collationName; + this.version = version; +} + +public static CollationIdentifier fromString(String identifier) { + long numDots = identifier.chars().filter(ch -> ch == '.').count(); + assert(numDots > 0); + + if (numDots == 1) { +String[] parts = identifier.split("\\.", 2); +return new CollationIdentifier(parts[0], parts[1], null); + } + + String[] parts = identifier.split("\\.", 3); + return new CollationIdentifier(parts[0], parts[1], parts[2]); +} + +@Override +public String toString() { + if (version != null) { +return String.format("%s.%s.%s", provider, name, version); + } + + return toStringWithoutVersion(); +} + +/** + * Returns the identifier's string value without the version. + */ +public String toStringWithoutVersion() { Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]
dbatomic commented on code in PR #46599: URL: https://github.com/apache/spark/pull/46599#discussion_r1604648152 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -768,7 +768,7 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { }) } - test("hash based joins not allowed for non-binary collated strings") { + test("hash based joins are also allowed for non-binary collated strings") { Review Comment: Feel free to remove this test if `hash join should be used for collated strings` is enough. In tests we shouldn't care about history - i.e. it is not important that once upon a time we didn't support hash joins and now we do. Tests should just assert that current behaviour is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]
dbatomic commented on code in PR #46599: URL: https://github.com/apache/spark/pull/46599#discussion_r1604643347 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationKey.scala: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.util.CollationFactory +import org.apache.spark.sql.internal.types.StringTypeAnyCollation +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +case class CollationKey(expr: Expression) extends UnaryExpression with ExpectsInputTypes { + override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation) + override def dataType: DataType = BinaryType + + final lazy val collationId: Int = expr.dataType match { +case st: StringType => + st.collationId + } + + override def nullSafeEval(input: Any): Any = input match { +case str: UTF8String => + CollationFactory.getCollationKeyBytes(str, collationId) +case _ => Review Comment: I think that it doesn't make sense to ever return None here. You have guarantee that input is of a string type. You can just do explicit cast: `CollationFactory.getCollationKeyBytes(input.asInstanceOf[UTF8String], collationId)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
mihailom-db commented on PR #46474: URL: https://github.com/apache/spark/pull/46474#issuecomment-2117149071 @stefankandic SQL Standard does not support for casting to collated strings. I agree with the message thing. The standard only allows for CAST(1 AS STRING) COLLATE UNICODE, so that is what we were trying to achieve with this restriction on cast expression. The problem is that we have Spark Connect which can call col.cast(StringType(collation_name)) or col.cast("STRING COLLATE UNICODE") and we do not have an easy way of differentiating StringType() from type created like StringType(UTF8_BINARY). In order to preserve meaning of cast to STRING as default, I would need to add this flag. Another solution is to always treat user defined casts as implicit priority. @srielau what do you think of this type of behaviour for user defined casts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]
dbatomic commented on code in PR #46599: URL: https://github.com/apache/spark/pull/46599#discussion_r1604640371 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala: ## @@ -778,25 +778,25 @@ class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper { val df1 = spark.createDataFrame(sparkContext.parallelize(in), schema) // Binary collations are allowed to use hash join. -assert(collectFirst( - df1.hint("broadcast").join(df1, df1("col_binary") === df1("col_binary")) -.queryExecution.executedPlan) { +val bJoin = df1.hint("broadcast").join(df1, df1("col_binary") === df1("col_binary")) +val binaryJoinPlan = bJoin.queryExecution.executedPlan +assert(collectFirst(binaryJoinPlan) { case _: BroadcastHashJoinExec => () }.nonEmpty) -// Even with hint broadcast, hash join is not used for non-binary collated strings. -assert(collectFirst( - df1.hint("broadcast").join(df1, df1("col_non_binary") === df1("col_non_binary")) -.queryExecution.executedPlan) { +// Hash join is also used for non-binary collated strings. +val nbJoin = df1.hint("broadcast").join(df1, df1("col_non_binary") === df1("col_non_binary")) +val non_binaryJoinPlan = nbJoin.queryExecution.executedPlan +assert(collectFirst(non_binaryJoinPlan) { case _: BroadcastHashJoinExec => () -}.isEmpty) +}.nonEmpty) +// This is possible because CollationKey is injected into the plan. +assert(non_binaryJoinPlan.toString().contains("collationkey")) Review Comment: Can you explicitly do the check instead of toString + contains? e.g. once you get into `BroadcastHashJoinExec` you should be able to get the key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
stefankandic commented on code in PR #46474: URL: https://github.com/apache/spark/pull/46474#discussion_r1604633605 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala: ## @@ -74,7 +74,10 @@ class DataTypeAstBuilder extends SqlBaseParserBaseVisitor[AnyRef] { case (TIMESTAMP_LTZ, Nil) => TimestampType case (STRING, Nil) => typeCtx.children.asScala.toSeq match { - case Seq(_) => SqlApiConf.get.defaultStringType + case Seq(_) => +val st = SqlApiConf.get.defaultStringType +st.createdAsNonCollated = true Review Comment: can we avoid mutating the state of the defaultStringType? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48322][SQL][CONNECT][PYTHON] Drop internal metadata in `DataFrame.schema` [spark]
zhengruifeng opened a new pull request, #46636: URL: https://github.com/apache/spark/pull/46636 ### What changes were proposed in this pull request? Drop internal metadata in `DataFrame.schema` ### Why are the changes needed? Internal metadata might be leaked in both Spark Connect and Spark Classic, e.g. in Spark Classic ``` In [9]: spark.range(10).select(sf.lit(1).alias("key"), "id").groupBy("key").agg(sf.max("id")).schema.json() Out[9]: '{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"integer"},{"metadata":{"__autoGeneratedAlias":"true"},"name":"max(id)","nullable":true,"type":"long"}],"type":"struct"}' ``` What make it worse is that internal metadata maybe leaked in different cases, so need to add additional `_drop_meta` in Pandas APIs to make assertions work. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]
uros-db commented on code in PR #46618: URL: https://github.com/apache/spark/pull/46618#discussion_r1604620097 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite }) } + test("CurrentTimeZone expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select current_timezone()" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("DayName expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select dayname(current_date())" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("ToUnixTimestamp expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select to_unix_timestamp(collate('2021-01-01 00:00:00', '${collationName}'), + |collate('-MM-dd HH:mm:ss', '${collationName}')) + |""".stripMargin + // Result + val testQuery = sql(query) + val dataType = LongType + val expectedResult = 1609488000L + assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getLong(0)) +}) + } + + test("FromUnixTime expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select from_unixtime(1609488000, collate('-MM-dd HH:mm:ss', '${collationName}')) + |""".stripMargin + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +val expectedResult = "2021-01-01 00:00:00" +assertResult(dataType)(testQuery.schema.fields.head.dataType) +assertResult(expectedResult)(testQuery.collect().head.getString(0)) + } +}) + } + + test("NextDay expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select next_day('2015-01-14', collate('TU', '${collationName}')) + |""".stripMargin + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = DateType +val expectedResult = "2015-01-20" +assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getDate(0).toString) + } +}) + } + + test("FromUTCTimestamp expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select from_utc_timestamp(collate('2016-08-31', '${collationName}'), + |collate('Asia/Seoul', '${collationName}')) + |""".stripMargin + // Result + val testQuery = sql(query) + val dataType = TimestampType + val expectedResult = "2016-08-31 09:00:00.0" + assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getTimestamp(0).toString) +}) + } + + test("ToUTCTimestamp expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select to_utc_timestamp(collate('2016-08-31 09:00:00', '${collationName}'), + |collate('Asia/Seoul', '${collationName}')) + |""".stripMargin + // Result + val testQuery = sql(query) + val dataType = TimestampType + val expectedResult = "2016-08-31 00:00:00.0" + assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getTimestamp(0).toString) +}) + } + + test("ParseToDate expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select
Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]
stefankandic commented on PR #46474: URL: https://github.com/apache/spark/pull/46474#issuecomment-2117114028 Why do we want to do this? Should it be a parser error? Also, I don't think the error message is very clear - we should at least let the user know how to perform the cast to another collation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]
uros-db commented on code in PR #46618: URL: https://github.com/apache/spark/pull/46618#discussion_r1604614789 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite }) } + test("CurrentTimeZone expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { Review Comment: since we're now using this `Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI")` a lot, let's separate it out and call it something like `testCollationsSeq` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]
uros-db commented on code in PR #46618: URL: https://github.com/apache/spark/pull/46618#discussion_r1604612518 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite }) } + test("CurrentTimeZone expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select current_timezone()" + // Result Review Comment: (goes for other similar tests too) ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite }) } + test("CurrentTimeZone expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select current_timezone()" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("DayName expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select dayname(current_date())" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("ToUnixTimestamp expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select to_unix_timestamp(collate('2021-01-01 00:00:00', '${collationName}'), + |collate('-MM-dd HH:mm:ss', '${collationName}')) + |""".stripMargin + // Result + val testQuery = sql(query) + val dataType = LongType + val expectedResult = 1609488000L + assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getLong(0)) Review Comment: (goes for other similar tests too) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]
uros-db commented on code in PR #46618: URL: https://github.com/apache/spark/pull/46618#discussion_r1604611590 ## sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala: ## @@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite }) } + test("CurrentTimeZone expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select current_timezone()" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("DayName expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = "select dayname(current_date())" + // Result + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) { +val testQuery = sql(query) +val dataType = StringType(collationName) +assertResult(dataType)(testQuery.schema.fields.head.dataType) + } +}) + } + + test("ToUnixTimestamp expression with collation") { +// Supported collations +Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", "UNICODE_CI").foreach(collationName => { + val query = +s""" + |select to_unix_timestamp(collate('2021-01-01 00:00:00', '${collationName}'), + |collate('-MM-dd HH:mm:ss', '${collationName}')) + |""".stripMargin + // Result + val testQuery = sql(query) + val dataType = LongType + val expectedResult = 1609488000L + assertResult(dataType)(testQuery.schema.fields.head.dataType) + assertResult(expectedResult)(testQuery.collect().head.getLong(0)) Review Comment: use `checkAnswer` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org