Re: [PR] [SPARK-48238][BUILD][YARN] Replace YARN AmIpFilter with a forked implementation [spark]

2024-05-17 Thread via GitHub


pan3793 commented on PR #46611:
URL: https://github.com/apache/spark/pull/46611#issuecomment-2118631633

   @cloud-fan I updated the code to reserve original code as much as possible, 
also leave comments to clarify the modifications.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48105][SS][3.5] Fix the race condition between state store unloading and snapshotting [spark]

2024-05-17 Thread via GitHub


HeartSaVioR commented on PR #46415:
URL: https://github.com/apache/spark/pull/46415#issuecomment-2118631470

   Also merged into 3.4.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] test maven [spark]

2024-05-17 Thread via GitHub


panbingkun opened a new pull request, #46648:
URL: https://github.com/apache/spark/pull/46648

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] assorted copy edits to migration instructions [spark]

2024-05-17 Thread via GitHub


github-actions[bot] commented on PR #45048:
URL: https://github.com/apache/spark/pull/45048#issuecomment-2118519868

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-43829][CONNECT] Improve SparkConnectPlanner by reuse Dataset and avoid construct new Dataset [spark]

2024-05-17 Thread via GitHub


github-actions[bot] commented on PR #43473:
URL: https://github.com/apache/spark/pull/43473#issuecomment-2118519899

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


jiangzho commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605620527


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we

Review Comment:
   thanks for the catch! fixed the typo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


jiangzho commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605620441


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);

Review Comment:
   yes - it should be possible to use custom Cluster Manager by setting 
`spark.master`. When master is not expplicitly set in SparkConf, this would 
automatically generate master URL based 

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


jiangzho commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605619790


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] Writing OperatorStateMetadata for the TransformWithState operator [spark]

2024-05-17 Thread via GitHub


ericm-db closed pull request #46647: Writing OperatorStateMetadata for the 
TransformWithState operator
URL: https://github.com/apache/spark/pull/46647


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Writing OperatorStateMetadata for the TransformWithState operator [spark]

2024-05-17 Thread via GitHub


ericm-db opened a new pull request, #46647:
URL: https://github.com/apache/spark/pull/46647

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] OperatorStateMetadata [spark]

2024-05-17 Thread via GitHub


ericm-db closed pull request #46645: [WIP] OperatorStateMetadata
URL: https://github.com/apache/spark/pull/46645


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48324][SQL] Codegen Support for `hll_sketch_estimate` and `hll_union` [spark]

2024-05-17 Thread via GitHub


panbingkun commented on PR #46639:
URL: https://github.com/apache/spark/pull/46639#issuecomment-2118470856

   cc @yaooqinn @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48328][BUILD] Upgrade `Arrow` to 16.1.0 [spark]

2024-05-17 Thread via GitHub


panbingkun opened a new pull request, #46646:
URL: https://github.com/apache/spark/pull/46646

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]

2024-05-17 Thread via GitHub


gengliangwang closed pull request #46643: [SPARK-48294][SQL][3.5] Handle 
lowercase in nestedTypeMissingElementTypeError
URL: https://github.com/apache/spark/pull/46643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]

2024-05-17 Thread via GitHub


gengliangwang commented on PR #46643:
URL: https://github.com/apache/spark/pull/46643#issuecomment-2118443352

   Thanks, merging to branch-3.5


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605570331


##
spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static 
org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.k8s.operator.status.AttemptInfo;
+
+class SparkAppSubmissionWorkerTest {

Review Comment:
   Please add more test coverage. For example, a corner case like 
`generateHashBasedId` whose input has long long identifiers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605569613


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605568720


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605568720


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605567704


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605566899


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java:
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.collection.immutable.HashMap;
+import scala.collection.immutable.Map;
+import scala.jdk.CollectionConverters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.deploy.k8s.Config;
+import org.apache.spark.deploy.k8s.Constants;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.SparkPod;
+import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
+
+/**
+ * Resembles resources that would be directly launched by operator. Based on 
resolved
+ * org.apache.spark.deploy.k8s.KubernetesDriverSpec, it:
+ *
+ * 
+ *   Add ConfigMap as a resource for driver
+ *   Converts scala types to Java for easier reference from operator
+ * 
+ *
+ * This is not thread safe and not expected to be shared among reconciler 
threads
+ */
+public class SparkAppResourceSpec {
+  @Getter private final Pod configuredPod;
+  @Getter private final List driverPreResources;
+  @Getter private final List driverResources;
+  private final SparkAppDriverConf kubernetesDriverConf;
+
+  public SparkAppResourceSpec(
+  SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec 
kubernetesDriverSpec) {
+this.kubernetesDriverConf = kubernetesDriverConf;
+String namespace = 
kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key());
+Map confFilesMap =
+KubernetesClientUtils.buildSparkConfDirFilesMap(
+kubernetesDriverConf.configMapNameDriver(),
+kubernetesDriverConf.sparkConf(),
+kubernetesDriverSpec.systemProperties())
+.$plus(new Tuple2<>(Config.KUBERNETES_NAMESPACE().key(), 
namespace));
+SparkPod sparkPod = addConfigMap(kubernetesDriverSpec.pod(), confFilesMap);
+this.configuredPod =
+new PodBuilder(sparkPod.pod())
+.editSpec()
+.addToContainers(sparkPod.container())
+.endSpec()
+.build();
+this.driverPreResources =
+new ArrayList<>(
+
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverPreKubernetesResources())
+.asJava());
+this.driverResources =
+new ArrayList<>(
+
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverKubernetesResources())
+.asJava());
+this.driverResources.add(
+KubernetesClientUtils.buildConfigMap(
+kubernetesDriverConf.configMapNameDriver(), confFilesMap, new 
HashMap<>()));
+this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
+this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
+  }
+
+  private void setNamespaceIfMissing(HasMetadata resource, String namespace) {
+if (StringUtils.isNotEmpty(resource.getMetadata().getNamespace())) {
+  return;
+}
+resource.getMetadata().setNamespace(namespace);
+  }
+
+  private SparkPod addConfigMap(SparkPod pod, Map 
confFilesMap) {
+Container containerWithVolume =
+new ContainerBuilder(pod.container())
+.addNewEnv()
+.withName(Constants.ENV_SPARK_CONF_DIR())
+.withValue(Constants.SPARK_CONF_DIR_INTERNAL())
+.endEnv()
+.addNewVolumeMount()
+.withName(Constants.SPARK_CONF_VOLUME_DRIVER())
+.withMountPath(Constants.SPARK_CONF_DIR_INTERNAL())
+.endVolumeMount()
+.build();
+Pod podWithVolume =

Review Comment:
   It would be great to have more specific name like 

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605565719


##
spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static 
org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.k8s.operator.status.AttemptInfo;
+
+class SparkAppSubmissionWorkerTest {
+  @Test
+  void buildDriverConfShouldApplySpecAndPropertiesOverride() {
+Map> constructorArgs = new HashMap<>();
+try (MockedConstruction mocked =
+mockConstruction(
+SparkAppDriverConf.class,
+(mock, context) -> constructorArgs.put(mock, new 
ArrayList<>(context.arguments() {
+  SparkApplication mockApp = mock(SparkApplication.class);
+  ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+  ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+  Map appProps = new HashMap<>();
+  appProps.put("foo", "bar");
+  appProps.put("spark.executor.instances", "1");
+  appProps.put("spark.kubernetes.namespace", "ns2");
+  Map overrides = new HashMap<>();
+  overrides.put("spark.executor.instances", "5");
+  overrides.put("spark.kubernetes.namespace", "ns3");

Review Comment:
   Please add a new test coverage with a long namespace length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605565102


##
spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java:
##
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+
+import scala.collection.immutable.HashMap;
+import scala.collection.immutable.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.SparkPod;
+
+class SparkAppResourceSpecTest {
+
+  @Test
+  void testDriverResourceIncludesConfigMap() {
+SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class);
+when(mockConf.configMapNameDriver()).thenReturn("foo-configmap");
+when(mockConf.sparkConf())
+.thenReturn(new SparkConf().set("spark.kubernetes.namespace", 
"foo-namespace"));
+
+KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class);
+Pod driver = buildBasicPod("driver");
+SparkPod sparkPod = new SparkPod(driver, buildBasicContainer());
+
+// Add some mock resources and pre-resources
+Pod pod1 = buildBasicPod("pod-1");
+Pod pod2 = buildBasicPod("pod-2");
+List preResourceList = Collections.singletonList(pod1);
+List resourceList = Collections.singletonList(pod2);
+Seq preResourceSeq = 
CollectionConverters.asScala(preResourceList).toList();
+Seq resourceSeq = 
CollectionConverters.asScala(resourceList).toList();
+when(mockSpec.driverKubernetesResources()).thenReturn(resourceSeq);
+when(mockSpec.driverPreKubernetesResources()).thenReturn(preResourceSeq);
+when(mockSpec.pod()).thenReturn(sparkPod);
+when(mockSpec.systemProperties()).thenReturn(new HashMap<>());
+
+SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, 
mockSpec);
+
+Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());

Review Comment:
   Shall we import `Assertions.assertEquals`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605564747


##
spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+
+import scala.Option;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+
+class SparkAppDriverConfTest {
+  @Test
+  void testResourceNamePrefix() {
+// Resource prefix shall be deterministic per SparkApp per attempt
+SparkConf sparkConf = new SparkConf();
+sparkConf.set("foo", "bar");
+sparkConf.set("spark.executor.instances", "1");
+String appId = UUID.randomUUID().toString();
+SparkAppDriverConf sparkAppDriverConf =
+SparkAppDriverConf.create(
+sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, 
Option.empty());
+String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
+Assertions.assertEquals(resourcePrefix, appId);
+
Assertions.assertTrue(sparkAppDriverConf.configMapNameDriver().contains(resourcePrefix));

Review Comment:
   Please add more test cases seperately at least for all public methods. For 
example, it would be great if `configMapNameDriver` has a new test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605562742


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);
+String appId = generateSparkAppId(app);
+effectiveSparkConf.setIfMissing("spark.app.id", appId);
+return SparkAppDriverConf.create(
+effectiveSparkConf,
+appId,
+

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605560525


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+
+  public SparkAppResourceSpec getResourceSpec(
+  SparkApplication app, KubernetesClient client, Map 
confOverrides) {
+SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+  SparkApplication app, Map confOverrides) {
+ApplicationSpec applicationSpec = app.getSpec();
+SparkConf effectiveSparkConf = new SparkConf();
+if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+  for (String confKey : applicationSpec.getSparkConf().keySet()) {
+effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+  }
+}
+if (MapUtils.isNotEmpty(confOverrides)) {
+  for (Map.Entry entry : confOverrides.entrySet()) {
+effectiveSparkConf.set(entry.getKey(), entry.getValue());
+  }
+}
+effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+  primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+  effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+  primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+  effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+  primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+}
+effectiveSparkConf.setIfMissing(
+"spark.master", 
"k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT;);

Review Comment:
   To @jiangzho and @aaruna, as we know, Apache Spark's 
`ExternalClusterManager` allows a custom K8s-based external cluster manager. 
So, it would be great if `Spark K8s Operator` has a 

Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r160424


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java:
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). This we

Review Comment:
   `This we`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605551694


##
spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java:
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import scala.Option;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.Config;
+import org.apache.spark.deploy.k8s.KubernetesDriverConf;
+import org.apache.spark.deploy.k8s.KubernetesVolumeUtils;
+import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+
+public class SparkAppDriverConf extends KubernetesDriverConf {
+  private SparkAppDriverConf(
+  SparkConf sparkConf,
+  String appId,
+  MainAppResource mainAppResource,
+  String mainClass,
+  String[] appArgs,
+  Option proxyUser) {
+super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, 
null);
+  }
+
+  public static SparkAppDriverConf create(
+  SparkConf sparkConf,
+  String appId,
+  MainAppResource mainAppResource,
+  String mainClass,
+  String[] appArgs,
+  Option proxyUser) {
+// pre-create check only
+KubernetesVolumeUtils.parseVolumesWithPrefix(
+sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
+return new SparkAppDriverConf(sparkConf, appId, mainAppResource, 
mainClass, appArgs, proxyUser);
+  }
+
+  /** Application managed by operator has a deterministic prefix */
+  @Override
+  public String resourceNamePrefix() {
+return appId();
+  }
+
+  public String configMapNameDriver() {

Review Comment:
   When we make a new K8s resource name, we should guarantee that this complies 
K8s naming limit. 
   
   Could you add a method description, what is the range of string length of 
this method's return value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605547679


##
spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java:
##
@@ -107,4 +108,12 @@ public static boolean 
overrideExecutorTemplateEnabled(ApplicationSpec applicatio
 && applicationSpec.getExecutorSpec() != null
 && applicationSpec.getExecutorSpec().getPodTemplateSpec() != null;
   }
+
+  public static long getAttemptId(final SparkApplication app) {

Review Comment:
   Thank you for adding `final`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605544109


##
gradle.properties:
##
@@ -18,17 +18,23 @@
 group=org.apache.spark.k8s.operator
 version=0.1.0
 
+# Caution: fabric8 version should be aligned with Spark dependency
 fabric8Version=6.12.1
 commonsLang3Version=3.14.0
 commonsIOVersion=2.16.1
 lombokVersion=1.18.32
 
+#Spark

Review Comment:
   We need a space.
   - `#Spark` -> `# Spark`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605543257


##
build.gradle:
##
@@ -25,6 +25,11 @@ subprojects {
 
   repositories {
 mavenCentral()
+// This is a workaround to resolve Spark 4.0.0-preview-1
+// To be removed for official release

Review Comment:
   Apache Spark recommends to use `IDed TODO`s. Please file a JIRA issue and 
use it like the following.
   
   ```
   - // To be removed for official release
   + // TODO(SPARK-X) Use Apache Spark 4.0.0-preview1 when it's ready
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605542332


##
build.gradle:
##
@@ -25,6 +25,11 @@ subprojects {
 
   repositories {
 mavenCentral()
+// This is a workaround to resolve Spark 4.0.0-preview-1

Review Comment:
   `4.0.0-preview-1` -> `4.0.0-preview1`
   
   - https://github.com/apache/spark/releases/tag/v4.0.0-preview1-rc1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48017] Add Spark application submission worker for operator [spark-kubernetes-operator]

2024-05-17 Thread via GitHub


dongjoon-hyun commented on code in PR #10:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/10#discussion_r1605541866


##
gradle.properties:
##
@@ -18,17 +18,23 @@
 group=org.apache.spark.k8s.operator
 version=0.1.0
 
+# Caution: fabric8 version should be aligned with Spark dependency
 fabric8Version=6.12.1
 commonsLang3Version=3.14.0
 commonsIOVersion=2.16.1
 lombokVersion=1.18.32
 
+#Spark
+scalaVersion=2.13
+sparkVersion=4.0.0-preview1

Review Comment:
   > @dongjoon-hyun - can we use rc1 for submission worker for now, in order to 
unblock the operator module ?
   > 
   > I'll be more than happy to upgrade & fix possible incompatibility when RC2 
is ready.
   
   Thank you, @jiangzho . Thank you for updating to use RC1 first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1605523709


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -74,16 +80,28 @@ case class Mode(
 if (buffer.isEmpty) {
   return null
 }
-
+val collationAwareBuffer =
+  if 
(!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
+val modeMap = buffer.toSeq.groupMapReduce {
+  case (key: String, _) =>
+CollationFactory.getCollationKey(UTF8String.fromString(key), 
collationId)
+  case (key: UTF8String, _) =>
+CollationFactory.getCollationKey(key, collationId)
+  case (key, _) => key
+}(x => x)((x, y) => (x._1, x._2 + y._2)).values
+modeMap
+  } else {
+buffer
+  }

Review Comment:
   @uros-db done



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -41,6 +42,11 @@ case class Mode(
 this(child, 0, 0, Some(reverse))
   }
 
+  final lazy val collationId: Int = child.dataType match {
+case c: StringType => c.collationId
+case _ => CollationFactory.UTF8_BINARY_COLLATION_ID
+  }

Review Comment:
   @uros-db done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on PR #46597:
URL: https://github.com/apache/spark/pull/46597#issuecomment-2118330617

   > since Mode expression works with any child expression, and you 
special-cased handling Strings, how do we handle Array(String) and 
Struct(String), etc.?
   
   In my local tests, I found that Mode performs a byte-by-byte comparison for 
structs, which does not consider collation. So that is still outstanding. Good 
catch! 
   
   @uros-db There are several strategies we might adopt to handle structs with 
collation fields. I am looking into implementations. It is potentially 
straightforward though have some gotchas.
   
   Do you feel I should solve for that in a separate PR or in this one? I 
assume you prefer that this get solve in this PR and not a follow-up PR, right? 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48258][PYTHON][CONNECT] Checkpoint and localCheckpoint in Spark Connect [spark]

2024-05-17 Thread via GitHub


hvanhovell commented on code in PR #46570:
URL: https://github.com/apache/spark/pull/46570#discussion_r1605509972


##
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##
@@ -199,6 +200,17 @@ message AnalyzePlanRequest {
 // (Required) The logical plan to get the storage level.
 Relation relation = 1;
   }
+
+  message Checkpoint {

Review Comment:
   Caching is lazy, so it cannot trigger execution of the main query. 
Checkpoint can actually do this, which is weird for an analyze request. 
Moreover analyse does not work well with long running operations (i.e. an eager 
checkpoint) for the following reasons::
   - Analyze needs a thread in the server's thread pool which in extreme cases 
can lead to exhaustion, or in a less extreme scenario can lead to reduced 
availability.
   - Long running request can be killed by intermediate proxies. Analyze does 
not support reattach which is an issue when the checkpoint takes too long. In 
that case you are basically dead in the water.
   
   The argument that ExecutePlanResponse does not support this is IMO a bit 
flimsy. Look for example at the `SqlCommandResult` this can be returned as part 
of the `ExecutePlanResponse` and it returns a relation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]

2024-05-17 Thread via GitHub


hvanhovell closed pull request #46638: [SPARK-47818][CONNECT][FOLLOW-UP] 
Introduce plan cache in SparkConnectPlanner to improve performance of Analyze 
requests
URL: https://github.com/apache/spark/pull/46638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


pan3793 commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   I was consider to port the code as-is(also keeping package and class name) 
as we did in the `thriftserver` module for `hive-service`, while it would cause 
class conflicts because `hadoop-client-minicluster`(present in test scope) 
ships those classes too. Meanwhile, I found an existing 
`YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop 
classes. So I rename the class to `YarnAMIpFilter`, rewrite it in Scala, and 
put it alongside `YarnProxyRedirectFilter`. UT `YarnAMIpFilterSuite` is ported 
too to ensure the correctness of rewriting.
   
   @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to 
Java classes, and reuse the Hadoop code as much as possible. For example, use 
`org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in 
`sendRedirect` instead of writing by hand as we did in 
`YarnProxyRedirectFilter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


pan3793 commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   I was consider to port the code as-is(also keeping package and class name) 
as we did in the `thriftserver` module for `hive-service`, while it would cause 
class conflicts because `hadoop-client-minicluster`(present in test scope) 
ships those classes too. Meanwhile, I found an existing 
`YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop 
classes. So I rename the class to `YarnAMIpFilter`, rewrote it in Scala, and 
put it alongside `YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is 
ported too to ensure the correctness of rewriting.
   
   @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to 
Java classes, and reuse the Hadoop code as much as possible. For example, use 
`org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in 
`sendRedirect` instead of writing by hand as we did in 
`YarnProxyRedirectFilter`.



##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   I was consider to port the code as-is(also keeping package and class name) 
as we did in the `thriftserver` module for `hive-service`, while it would cause 
class conflicts because `hadoop-client-minicluster`(present in test scope) 
ships those classes too. Meanwhile, I found an existing 
`YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop 
classes. So I rename the class to `YarnAMIpFilter`, rewrite it in Scala, and 
put it alongside `YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is 
ported too to ensure the correctness of rewriting.
   
   @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to 
Java 

Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


pan3793 commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605495540


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   I was consider to port the code as-is(also keeping package and class name) 
as we did in the `thriftserver` module for `hive-service`, while it would cause 
class conflicts because `hadoop-client-minicluster`(present in test scope) 
ships those classes too. Meanwhile, I found an existing 
`YarnProxyRedirectFilter` which is much simpler and is not coupled with Hadoop 
classes. So I rename the class to `YarnAMIpFilter`, and put it alongside 
`YarnProxyRedirectFilter`. A UT `YarnAMIpFilterSuite` is ported too to ensure 
the correctness of rewriting.
   
   @cloud-fan if we pursue sync with Hadoop upstream, I can convert it back to 
Java classes, and reuse the Hadoop code as much as possible. For example, use 
`org.apache.hadoop.yarn.webapp.hamlet2.Hamlet` to construct the HTML page in 
`sendRedirect` instead of writing by hand as we did in 
`YarnProxyRedirectFilter`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


mkaravel commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1605449462


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -118,76 +119,433 @@ public Collation(
 }
 
 /**
- * Constructor with comparators that are inherited from the given collator.
+ * Collation id is defined as 32-bit integer.
+ * We specify binary layouts for different classes of collations.
+ * Classes of collations are differentiated by most significant 3 bits 
(bit 31, 30 and 29),
+ * bit 31 being most significant and bit 0 being least significant.
+ * ---
+ * INDETERMINATE collation id binary layout:
+ * bit 31-0: 1
+ * INDETERMINATE collation id is equal to -1
+ * ---
+ * user-defined collation id binary layout:
+ * bit 31:   0
+ * bit 30:   1
+ * bit 29-0: undefined, reserved for future use
+ * ---
+ * UTF8_BINARY collation id binary layout:
+ * bit 31-22: zeroes
+ * bit 21-18: zeroes, reserved for space trimming
+ * bit 17-16: zeroes, reserved for version
+ * bit 15-3:  zeroes
+ * bit 2: 0, reserved for accent sensitivity
+ * bit 1: 0, reserved for uppercase and case-insensitive
+ * bit 0: 0 = case-sensitive, 1 = lowercase
+ * ---
+ * ICU collation id binary layout:
+ * bit 31-30: zeroes
+ * bit 29:1
+ * bit 28-24: zeroes
+ * bit 23-22: zeroes, reserved for version

Review Comment:
   Is there a good reason why version appears at different positions for the 
UTF8_BINARY family and ICU collations?



##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -118,76 +119,433 @@ public Collation(
 }
 
 /**
- * Constructor with comparators that are inherited from the given collator.
+ * Collation id is defined as 32-bit integer.
+ * We specify binary layouts for different classes of collations.
+ * Classes of collations are differentiated by most significant 3 bits 
(bit 31, 30 and 29),
+ * bit 31 being most significant and bit 0 being least significant.
+ * ---
+ * INDETERMINATE collation id binary layout:
+ * bit 31-0: 1
+ * INDETERMINATE collation id is equal to -1
+ * ---
+ * user-defined collation id binary layout:
+ * bit 31:   0
+ * bit 30:   1
+ * bit 29-0: undefined, reserved for future use
+ * ---
+ * UTF8_BINARY collation id binary layout:
+ * bit 31-22: zeroes
+ * bit 21-18: zeroes, reserved for space trimming
+ * bit 17-16: zeroes, reserved for version
+ * bit 15-3:  zeroes
+ * bit 2: 0, reserved for accent sensitivity
+ * bit 1: 0, reserved for uppercase and case-insensitive
+ * bit 0: 0 = case-sensitive, 1 = lowercase
+ * ---
+ * ICU collation id binary layout:
+ * bit 31-30: zeroes
+ * bit 29:1
+ * bit 28-24: zeroes
+ * bit 23-22: zeroes, reserved for version
+ * bit 21-18: zeroes, reserved for space trimming
+ * bit 17:0 = case-sensitive, 1 = case-insensitive
+ * bit 16:0 = accent-sensitive, 1 = accent-insensitive
+ * bit 15-14: zeroes, reserved for punctuation sensitivity
+ * bit 13-12: zeroes, reserved for first letter preference
+ * bit 11-0:  locale id as specified in `ICULocaleToId` mapping
+ * ---
+ * Some illustrative examples of collation name to id mapping:
+ * - UTF8_BINARY   -> 0
+ * - UTF8_BINARY_LCASE -> 1
+ * - UNICODE   -> 0x2000
+ * - UNICODE_AI-> 0x2001
+ * - UNICODE_CI-> 0x2002
+ * - UNICODE_CI_AI -> 0x2003
+ * - af-> 0x2001
+ * - af_CI_AI  -> 0x20030001
  */
-public Collation(
-String collationName,
-Collator collator,
-String version,
-boolean supportsBinaryEquality,
-boolean supportsBinaryOrdering,
-boolean supportsLowercaseEquality) {
-  this(
-collationName,
-collator,
-(s1, s2) -> collator.compare(s1.toString(), s2.toString()),
-version,
-s -> (long)collator.getCollationKey(s.toString()).hashCode(),
-supportsBinaryEquality,
-supportsBinaryOrdering,
-supportsLowercaseEquality);
+private abstract static class CollationSpec {
+
+  private enum DefinitionOrigin {

Review Comment:
   I do not want to block this PR for this, but I find the lack of comments for 
variables, inner classes, methods, etc. quite problematic. The only way to 
understand what is going on is by looking at the code, which could be okay in 
some cases, but it definitely does not help in understanding how the different 
pieces fit together.



##
common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala:
##
@@ -152,4 +219,218 @@ class CollationFactorySuite extends AnyFunSuite with 

Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


pan3793 commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605458503


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   This is a functional equivalent rewrite. Simplify the code by converting to 
Scala and merging code from several Hadoop Java classes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


mkaravel commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r160575


##
connector/connect/common/src/main/protobuf/spark/connect/types.proto:
##
@@ -101,7 +101,7 @@ message DataType {
 
   message String {
 uint32 type_variation_reference = 1;
-uint32 collation_id = 2;
+string collation = 2;

Review Comment:
   It took me a bit, but I get it now. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] OperatorStateMetadata [spark]

2024-05-17 Thread via GitHub


ericm-db opened a new pull request, #46645:
URL: https://github.com/apache/spark/pull/46645

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] [spark]

2024-05-17 Thread via GitHub


ericm-db opened a new pull request, #46644:
URL: https://github.com/apache/spark/pull/46644

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48294][SQL][3.5] Handle lowercase in nestedTypeMissingElementTypeError [spark]

2024-05-17 Thread via GitHub


michaelzhan-db opened a new pull request, #46643:
URL: https://github.com/apache/spark/pull/46643

   
   
   ### What changes were proposed in this pull request?
   
   Backport of #46623.
   Handle lowercase values inside of nestTypeMissingElementTypeError to prevent 
match errors.
   
   
   ### Why are the changes needed?
   
   The previous match error was not user-friendly. Now it gives an actionable 
`INCOMPLETE_TYPE_DEFINITION` error.
   
   ### Does this PR introduce _any_ user-facing change?
   
   N/A
   
   ### How was this patch tested?
   
   Newly added tests pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1605139359


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -74,16 +80,28 @@ case class Mode(
 if (buffer.isEmpty) {
   return null
 }
-
+val collationAwareBuffer =
+  if 
(!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
+val modeMap = buffer.toSeq.groupMapReduce {
+  case (key: String, _) =>
+CollationFactory.getCollationKey(UTF8String.fromString(key), 
collationId)
+  case (key: UTF8String, _) =>
+CollationFactory.getCollationKey(key, collationId)
+  case (key, _) => key
+}(x => x)((x, y) => (x._1, x._2 + y._2)).values
+modeMap
+  } else {
+buffer
+  }

Review Comment:
   also, you may be able to use just: 
`CollationFactory.getCollationKey(key.asInstanceOf[UTF8String], st.collationId)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1605136854


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -74,16 +80,28 @@ case class Mode(
 if (buffer.isEmpty) {
   return null
 }
-
+val collationAwareBuffer =
+  if 
(!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
+val modeMap = buffer.toSeq.groupMapReduce {
+  case (key: String, _) =>
+CollationFactory.getCollationKey(UTF8String.fromString(key), 
collationId)
+  case (key: UTF8String, _) =>
+CollationFactory.getCollationKey(key, collationId)
+  case (key, _) => key
+}(x => x)((x, y) => (x._1, x._2 + y._2)).values
+modeMap
+  } else {
+buffer
+  }

Review Comment:
   ```suggestion
   val collationAwareBuffer = child.dataType match {
 case st: StringType =>
 val modeMap = buffer.toSeq.groupMapReduce {
 case (key: String, _) =>
   CollationFactory.getCollationKey(UTF8String.fromString(key), 
st.collationId)
 case (key: UTF8String, _) =>
   CollationFactory.getCollationKey(key, st.collationId)
 case (key, _) => key
 }(x => x)((x, y) => (x._1, x._2 + y._2)).values
 modeMap
 case _ => buffer
   }
   ```
   
   (something like this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1605136854


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -74,16 +80,28 @@ case class Mode(
 if (buffer.isEmpty) {
   return null
 }
-
+val collationAwareBuffer =
+  if 
(!CollationFactory.fetchCollation(collationId).supportsBinaryEquality) {
+val modeMap = buffer.toSeq.groupMapReduce {
+  case (key: String, _) =>
+CollationFactory.getCollationKey(UTF8String.fromString(key), 
collationId)
+  case (key: UTF8String, _) =>
+CollationFactory.getCollationKey(key, collationId)
+  case (key, _) => key
+}(x => x)((x, y) => (x._1, x._2 + y._2)).values
+modeMap
+  } else {
+buffer
+  }

Review Comment:
   ```suggestion
   val collationAwareBuffer = child.dataType match {
 case st: StringType =>
 val modeMap = buffer.toSeq.groupMapReduce {
 case (key: String, _) =>
   CollationFactory.getCollationKey(UTF8String.fromString(key), 
collationId)
 case (key: UTF8String, _) =>
   CollationFactory.getCollationKey(key, collationId)
 case (key, _) => key
 }(x => x)((x, y) => (x._1, x._2 + y._2)).values
 modeMap
 case _ => buffer
   }
   ```
   
   (something like this)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-47227][FOLLOW][DOCS] Building Extensions [spark]

2024-05-17 Thread via GitHub


nchammas commented on code in PR #45340:
URL: https://github.com/apache/spark/pull/45340#discussion_r1605122743


##
docs/spark-connect-extending.md:
##
@@ -0,0 +1,248 @@
+---
+layout: global
+title: Extending Spark Connect with Custom Functionality
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+Apache Spark provides different ways for users and developers to extend the
+system with custom functionality using many different extension points. Due to
+the separation of the client from the server, some of the extensions mechanism

Review Comment:
   Typo: extension[] mechanism[s]



##
docs/spark-connect-extending.md:
##
@@ -0,0 +1,248 @@
+---
+layout: global
+title: Extending Spark Connect with Custom Functionality
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+Apache Spark provides different ways for users and developers to extend the
+system with custom functionality using many different extension points. Due to
+the separation of the client from the server, some of the extensions mechanism
+are changing.
+
+This can be separated into two main categories: First, extensions that are
+purely client side and do not extend the way the Spark server is behaving.
+Second, extensions that interact directly with Spark and need to run directly 
on
+the driver.
+
+
+## Client Extensions
+
+- Client extensions are programs that operate purely on the client surface of
+  Apache Spark and do not require any server side changes.
+
+
+## Server Extensions
+
+One of the biggest benefits of Spark Connect is the ability to extend Apache
+Spark with extensions and then use them seamlessly in all clients from any
+programming language with relative minimal effort.
+
+Building an extension for Spark Connect is an approach aimed at enhancing the
+functionality and flexibility of Apache Spark. Spark Connect operates on three
+core primitives: relations, expressions, and commands, each serving a unique
+purpose within the data processing framework.
+
+Relations in Spark Connect are fundamental to dataset transformations, acting 
as
+the mechanism through which an optional input dataset is transformed into an
+output dataset. Conceptually, relations can be likened to tables within a
+database, manipulated to achieve desired outcomes. Their functionality closely
+mirrors that of the DataFrame API, providing a familiar and intuitive interface
+for data manipulation.
+
+Expressions form another critical component of Spark Connect, functioning as
+operations that can be applied to individual columns or a set of columns within
+a dataset. These expressions are akin to functions in programming, offering a
+level of granularity in data processing that is comparable to the operations
+available through the Column API. This allows users to easily extend Spark with
+custom expression functionality and make it available to all clients.
+
+Commands stand out within Spark Connect as distinct actions that can be
+executed. Unlike relations, which focus on the transformation and nesting of
+output data, commands represent singular operations that perform specific tasks
+on the data. An example of such a command is a Data Manipulation Language (DML)

Review Comment:
   How is DML different from a DataFrame transformation / relation?
   
   You didn't happen to mean Data Definition Language (DDL) instead, did you? 
e.g. `ALTER TABLE`



##
docs/spark-connect-extending.md:
##
@@ -0,0 +1,248 @@
+---

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1605132882


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -41,6 +42,11 @@ case class Mode(
 this(child, 0, 0, Some(reverse))
   }
 
+  final lazy val collationId: Int = child.dataType match {
+case c: StringType => c.collationId
+case _ => CollationFactory.UTF8_BINARY_COLLATION_ID
+  }

Review Comment:
   I see, child is just Expression in Mode
   
   in this case, I would say remove final lazy val collationId (since it's only 
used in 1 place anyways), and just do pattern matching below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


cloud-fan commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605076750


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   If the original code is Java, I'd prefer we also add a Java file in Spark, 
so that it's easier to keep the code in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


cloud-fan commented on PR #46611:
URL: https://github.com/apache/spark/pull/46611#issuecomment-2117700736

   This looks like a good idea to get rid of the conflicting dependency. We 
should clarify 
https://github.com/apache/spark/pull/46611#discussion_r1605001322 though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] JDBC Connectors predicate pushdown testing [spark]

2024-05-17 Thread via GitHub


stefanbuk-db opened a new pull request, #46642:
URL: https://github.com/apache/spark/pull/46642

   
   
   ### What changes were proposed in this pull request?
   In this PR, I add a new trait with tests for integration testing of JDBC 
connectors. Also, I propose changes to `MsSqlServerDialect` to support more 
filter push downs.
   
   
   ### Why are the changes needed?
   These changes are needed for better testing of JDBC connectors and general 
improvements of push down capabilities.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   With added tests.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on PR #46597:
URL: https://github.com/apache/spark/pull/46597#issuecomment-2117660366

   @uros-db This is all cleaned up. Let's get some of the other reviewers to 
look at it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48210][DOC]Modify the description of whether dynamic partition… [spark]

2024-05-17 Thread via GitHub


tgravescs commented on PR #46496:
URL: https://github.com/apache/spark/pull/46496#issuecomment-2117624849

   what version of Spark are you using?  That issue I pointed to was fixed in 
4.0.0 and 3.5.1.
   
   The check and error that is on the main branch should read:
   
   ```
 if ((notRunningUnitTests || testExceptionThrown) &&
   !(isStandaloneOrLocalCluster || isYarn || isK8s)) {
   throw new SparkException("TaskResourceProfiles are only supported 
for Standalone, " +
 "Yarn and Kubernetes cluster for now when dynamic allocation is 
disabled.")
   ```
   
   
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala#L73
   
   I'm guessing you are using a version of Spark before that change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


tgravescs commented on code in PR #46611:
URL: https://github.com/apache/spark/pull/46611#discussion_r1605001322


##
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAMIpFilter.scala:
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import java.io.IOException
+import java.net.{HttpURLConnection, InetAddress, MalformedURLException, 
UnknownHostException, URL}
+import java.security.Principal
+import java.util.concurrent.TimeUnit
+
+import jakarta.servlet.{Filter, FilterChain, FilterConfig, ServletException, 
ServletRequest, ServletResponse}
+import jakarta.servlet.http.{HttpServletRequest, HttpServletRequestWrapper, 
HttpServletResponse}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.internal.Logging
+
+// This class is inspired by 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

Review Comment:
   can you clarify this, is it just  copy converted to scala or did you 
add/change functionality?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]

2024-05-17 Thread via GitHub


vicennial commented on PR #46638:
URL: https://github.com/apache/spark/pull/46638#issuecomment-2117599067

   cc @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SQL] Enable hash aggregation support for all collations (StringType) [spark]

2024-05-17 Thread via GitHub


uros-db opened a new pull request, #46640:
URL: https://github.com/apache/spark/pull/46640

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604955280


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1404,6 +1408,81 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("Support mode for string expression with collation - Basic Test") {
+Seq("utf8_binary", "utf8_binary_lcase", "unicode_ci", "unicode").foreach { 
collationId =>
+  val query = s"SELECT mode(collate('abc', '${collationId}'))"
+  checkAnswer(sql(query), Row("abc"))
+  
assert(sql(query).schema.fields.head.dataType.sameType(StringType(collationId)))
+}
+  }
+
+  test("Support mode for string expression with collation - Advanced Test") {
+case class ModeTestCase[R](collationId: String, bufferValues: Map[String, 
Long], result: R)
+val testCases = Seq(
+  ModeTestCase("utf8_binary", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a"),
+  ModeTestCase("utf8_binary_lcase", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), 
"b"),
+  ModeTestCase("unicode_ci", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "b"),
+  ModeTestCase("unicode", Map("a" -> 3L, "b" -> 2L, "B" -> 2L), "a")
+)
+testCases.foreach(t => {
+  val valuesToAdd = t.bufferValues.map { case (elt, numRepeats) =>
+(0L to numRepeats).map(_ => s"('$elt')").mkString(",")
+  }.mkString(",")
+
+  withTable("t") {
+sql("CREATE TABLE t(i STRING) USING parquet")
+sql("INSERT INTO t VALUES " + valuesToAdd)
+val query = s"SELECT mode(collate(i, '${t.collationId}')) FROM t"
+checkAnswer(sql(query), Row(t.result))
+
assert(sql(query).schema.fields.head.dataType.sameType(StringType(t.collationId)))
+
+  }
+})
+  }
+
+  test("Support Mode.eval(buffer)") {
+case class ModeTestCase[R](
+collationId: String,
+bufferValues: Map[String, Long],
+result: R)
+case class UTF8StringModeTestCase[R](
+collationId: String,
+bufferValues: Map[UTF8String, Long],
+result: R)
+
+val bufferValues = Map("a" -> 5L, "b" -> 4L, "B" -> 3L, "d" -> 2L, "e" -> 
1L)
+val testCasesStrings = Seq(ModeTestCase("utf8_binary", bufferValues, "a"),
+  ModeTestCase("utf8_binary_lcase", bufferValues, "b"),
+  ModeTestCase("unicode_ci", bufferValues, "b"),
+  ModeTestCase("unicode", bufferValues, "a"))
+
+val bufferValuesUTF8String = Map(
+  UTF8String.fromString("a") -> 5L,
+  UTF8String.fromString("b") -> 4L,
+  UTF8String.fromString("B") -> 3L,
+  UTF8String.fromString("d") -> 2L,
+  UTF8String.fromString("e") -> 1L)
+
+val testCasesUTF8String = Seq(
+  UTF8StringModeTestCase("utf8_binary", bufferValuesUTF8String, "a"),
+  UTF8StringModeTestCase("utf8_binary_lcase", bufferValuesUTF8String, "b"),
+  UTF8StringModeTestCase("unicode_ci", bufferValuesUTF8String, "b"),
+  UTF8StringModeTestCase("unicode", bufferValuesUTF8String, "a"))
+
+testCasesStrings.foreach(t => {
+  val buffer = new OpenHashMap[AnyRef, Long](5)
+  val myMode = Mode(child = Literal.create("some_column_name", 
StringType(t.collationId)))
+  t.bufferValues.foreach { case (k, v) => buffer.update(k, v) }
+  assert(myMode.eval(buffer).toString.toLowerCase() == 
t.result.toLowerCase())
+})
+
+testCasesUTF8String.foreach(t => {
+  val buffer = new OpenHashMap[AnyRef, Long](5)
+  val myMode = Mode(child = Literal.create("some_column_name", 
StringType(t.collationId)))
+  t.bufferValues.foreach { case (k, v) => buffer.update(k, v) }
+  assert(myMode.eval(buffer).toString.toLowerCase() == 
t.result.toLowerCase())
+})
+  }

Review Comment:
   I would love to add some E2E Benchmark tests. But it is okay if not. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604955632


##
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CollationBenchmark.scala:
##
@@ -185,6 +189,32 @@ abstract class CollationBenchmarkBase extends 
BenchmarkBase {
 }
 benchmark.run()
   }
+
+  def benchmarkMode(

Review Comment:
   I would love to add some E2E Benchmark tests. But it is okay if not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604951157


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE2948   2958   
   13  0.0   29483.6   1.0X
-UNICODE  2040   2042   
3  0.0   20396.6   1.4X
-UTF8_BINARY  2043   2043   
0  0.0   20426.3   1.4X
-UNICODE_CI  16318  16338   
   28  0.0  163178.4   0.2X
+UTF8_BINARY_LCASE2896   2898   
3  0.0   28958.7   1.0X
+UNICODE  2038   2040   
3  0.0   20377.5   1.4X
+UTF8_BINARY  2053   2054   
1  0.0   20534.9   1.4X
+UNICODE_CI  16779  16802   
   34  0.0  167785.2   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 3227   3228  
 1  0.0   32272.1   1.0X
-UNICODE  16637  16643  
 9  0.0  166367.7   0.2X
-UTF8_BINARY   3132   3137  
 7  0.0   31319.2   1.0X
-UNICODE_CI   17816  17829  
18  0.0  178162.4   0.2X
+UTF8_BINARY_LCASE 4705   4705  
 0  0.0   47048.0   1.0X
+UNICODE  18863  18867  
 6  0.0  188625.3   0.2X
+UTF8_BINARY   4894   4901  
11  0.0   48936.8   1.0X
+UNICODE_CI   19595  19598  
 4  0.0  195953.0   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  4824   4824 
  0  0.0   48243.7   1.0X
-UNICODE   69416  69475 
 84  0.0  694158.3   0.1X
-UTF8_BINARY3806   3808 
  2  0.0   38062.8   1.3X
-UNICODE_CI60943  60975 
 45  0.0  609426.2   0.1X
+UTF8_BINARY_LCASE  5011   5013 
  2  0.0   50113.1   1.0X
+UNICODE   68309  68319 
 13  0.0  683094.7   0.1X
+UTF8_BINARY3887   3887 
  1  0.0   38869.8   1.3X
+UNICODE_CI56675  56686 
 15  0.0  566750.3   0.1X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 11979  11980 
  1  0.0  119790.4   1.0X
-UNICODE

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604935916


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE2948   2958   
   13  0.0   29483.6   1.0X
-UNICODE  2040   2042   
3  0.0   20396.6   1.4X
-UTF8_BINARY  2043   2043   
0  0.0   20426.3   1.4X
-UNICODE_CI  16318  16338   
   28  0.0  163178.4   0.2X
+UTF8_BINARY_LCASE2896   2898   
3  0.0   28958.7   1.0X
+UNICODE  2038   2040   
3  0.0   20377.5   1.4X
+UTF8_BINARY  2053   2054   
1  0.0   20534.9   1.4X
+UNICODE_CI  16779  16802   
   34  0.0  167785.2   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 3227   3228  
 1  0.0   32272.1   1.0X
-UNICODE  16637  16643  
 9  0.0  166367.7   0.2X
-UTF8_BINARY   3132   3137  
 7  0.0   31319.2   1.0X
-UNICODE_CI   17816  17829  
18  0.0  178162.4   0.2X
+UTF8_BINARY_LCASE 4705   4705  
 0  0.0   47048.0   1.0X
+UNICODE  18863  18867  
 6  0.0  188625.3   0.2X
+UTF8_BINARY   4894   4901  
11  0.0   48936.8   1.0X
+UNICODE_CI   19595  19598  
 4  0.0  195953.0   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  4824   4824 
  0  0.0   48243.7   1.0X
-UNICODE   69416  69475 
 84  0.0  694158.3   0.1X
-UTF8_BINARY3806   3808 
  2  0.0   38062.8   1.3X
-UNICODE_CI60943  60975 
 45  0.0  609426.2   0.1X
+UTF8_BINARY_LCASE  5011   5013 
  2  0.0   50113.1   1.0X
+UNICODE   68309  68319 
 13  0.0  683094.7   0.1X
+UTF8_BINARY3887   3887 
  1  0.0   38869.8   1.3X
+UNICODE_CI56675  56686 
 15  0.0  566750.3   0.1X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 11979  11980 
  1  0.0  119790.4   1.0X
-UNICODE

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604945722


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE2948   2958   
   13  0.0   29483.6   1.0X
-UNICODE  2040   2042   
3  0.0   20396.6   1.4X
-UTF8_BINARY  2043   2043   
0  0.0   20426.3   1.4X
-UNICODE_CI  16318  16338   
   28  0.0  163178.4   0.2X
+UTF8_BINARY_LCASE2896   2898   
3  0.0   28958.7   1.0X
+UNICODE  2038   2040   
3  0.0   20377.5   1.4X
+UTF8_BINARY  2053   2054   
1  0.0   20534.9   1.4X
+UNICODE_CI  16779  16802   
   34  0.0  167785.2   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 3227   3228  
 1  0.0   32272.1   1.0X
-UNICODE  16637  16643  
 9  0.0  166367.7   0.2X
-UTF8_BINARY   3132   3137  
 7  0.0   31319.2   1.0X
-UNICODE_CI   17816  17829  
18  0.0  178162.4   0.2X
+UTF8_BINARY_LCASE 4705   4705  
 0  0.0   47048.0   1.0X
+UNICODE  18863  18867  
 6  0.0  188625.3   0.2X
+UTF8_BINARY   4894   4901  
11  0.0   48936.8   1.0X
+UNICODE_CI   19595  19598  
 4  0.0  195953.0   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  4824   4824 
  0  0.0   48243.7   1.0X
-UNICODE   69416  69475 
 84  0.0  694158.3   0.1X
-UTF8_BINARY3806   3808 
  2  0.0   38062.8   1.3X
-UNICODE_CI60943  60975 
 45  0.0  609426.2   0.1X
+UTF8_BINARY_LCASE  5011   5013 
  2  0.0   50113.1   1.0X
+UNICODE   68309  68319 
 13  0.0  683094.7   0.1X
+UTF8_BINARY3887   3887 
  1  0.0   38869.8   1.3X
+UNICODE_CI56675  56686 
 15  0.0  566750.3   0.1X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 11979  11980 
  1  0.0  119790.4   1.0X
-UNICODE

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604944495


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE2948   2958   
   13  0.0   29483.6   1.0X
-UNICODE  2040   2042   
3  0.0   20396.6   1.4X
-UTF8_BINARY  2043   2043   
0  0.0   20426.3   1.4X
-UNICODE_CI  16318  16338   
   28  0.0  163178.4   0.2X
+UTF8_BINARY_LCASE2896   2898   
3  0.0   28958.7   1.0X
+UNICODE  2038   2040   
3  0.0   20377.5   1.4X
+UTF8_BINARY  2053   2054   
1  0.0   20534.9   1.4X
+UNICODE_CI  16779  16802   
   34  0.0  167785.2   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 3227   3228  
 1  0.0   32272.1   1.0X
-UNICODE  16637  16643  
 9  0.0  166367.7   0.2X
-UTF8_BINARY   3132   3137  
 7  0.0   31319.2   1.0X
-UNICODE_CI   17816  17829  
18  0.0  178162.4   0.2X
+UTF8_BINARY_LCASE 4705   4705  
 0  0.0   47048.0   1.0X
+UNICODE  18863  18867  
 6  0.0  188625.3   0.2X
+UTF8_BINARY   4894   4901  
11  0.0   48936.8   1.0X
+UNICODE_CI   19595  19598  
 4  0.0  195953.0   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  4824   4824 
  0  0.0   48243.7   1.0X
-UNICODE   69416  69475 
 84  0.0  694158.3   0.1X
-UTF8_BINARY3806   3808 
  2  0.0   38062.8   1.3X
-UNICODE_CI60943  60975 
 45  0.0  609426.2   0.1X
+UTF8_BINARY_LCASE  5011   5013 
  2  0.0   50113.1   1.0X
+UNICODE   68309  68319 
 13  0.0  683094.7   0.1X
+UTF8_BINARY3887   3887 
  1  0.0   38869.8   1.3X
+UNICODE_CI56675  56686 
 15  0.0  566750.3   0.1X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 11979  11980 
  1  0.0  119790.4   1.0X
-UNICODE

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604942334


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -41,6 +42,11 @@ case class Mode(
 this(child, 0, 0, Some(reverse))
   }
 
+  final lazy val collationId: Int = child.dataType match {

Review Comment:
   if we use `asInstanceOf[StringType]` we will get an exception whenever the 
child is a numerical type or any non-string type.
   
   It would be ideal in the case of a non-string type if collationId could be 
-1 or something, but then CollationFactory.fetchCollation(collationId) would 
throw an exception. But it really is not precise to be relying on 
`!CollationFactory.fetchCollation(CollationFactory.UTF8_BINARY_COLLATION_ID).supportsBinaryEquality`
 when it is a non-string-type to continue with the non-collation logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604934461


##
sql/core/benchmarks/CollationBenchmark-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE3571   3576   
7  0.0   35708.8   1.0X
-UNICODE  2235   2240   
7  0.0   22349.2   1.6X
-UTF8_BINARY  2237   2242   
6  0.0   22371.7   1.6X
-UNICODE_CI  18733  18817   
  118  0.0  187333.8   0.2X
+UTF8_BINARY_LCASE3241   3252   
   16  0.0   32413.8   1.0X
+UNICODE  2080   2082   
3  0.0   20800.9   1.6X
+UTF8_BINARY  2081   2083   
2  0.0   20814.2   1.6X
+UNICODE_CI  17364  17384   
   27  0.0  173644.2   0.2X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 4260   4290  
41  0.0   42602.6   1.0X
-UNICODE  19536  19624  
   124  0.0  195360.2   0.2X
-UTF8_BINARY   3582   3612  
43  0.0   35818.5   1.2X
-UNICODE_CI   20381  20454  
   103  0.0  203814.1   0.2X
+UTF8_BINARY_LCASE 3614   3615  
 1  0.0   36142.6   1.0X
+UNICODE  18575  18585  
15  0.0  185747.7   0.2X
+UTF8_BINARY   3311   3326  
21  0.0   33111.6   1.1X
+UNICODE_CI   19241  19249  
11  0.0  192409.4   0.2X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  7347   7349 
  3  0.0   73467.1   1.0X
-UNICODE   73462  73608 
206  0.0  734623.2   0.1X
-UTF8_BINARY5775   5815 
 57  0.0   57746.0   1.3X
-UNICODE_CI57543  57619 
108  0.0  575425.2   0.1X
+UTF8_BINARY_LCASE  6928   6929 
  1  0.0   69276.9   1.0X
+UNICODE   65674  65693 
 27  0.0  656737.6   0.1X
+UTF8_BINARY5440   5457 
 23  0.0   54403.2   1.3X
+UNICODE_CI60549  60605 
 79  0.0  605488.5   0.1X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 15415  15424 
 13  0.0  154147.1   1.0X
-UNICODE  

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604935916


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE2948   2958   
   13  0.0   29483.6   1.0X
-UNICODE  2040   2042   
3  0.0   20396.6   1.4X
-UTF8_BINARY  2043   2043   
0  0.0   20426.3   1.4X
-UNICODE_CI  16318  16338   
   28  0.0  163178.4   0.2X
+UTF8_BINARY_LCASE2896   2898   
3  0.0   28958.7   1.0X
+UNICODE  2038   2040   
3  0.0   20377.5   1.4X
+UTF8_BINARY  2053   2054   
1  0.0   20534.9   1.4X
+UNICODE_CI  16779  16802   
   34  0.0  167785.2   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 3227   3228  
 1  0.0   32272.1   1.0X
-UNICODE  16637  16643  
 9  0.0  166367.7   0.2X
-UTF8_BINARY   3132   3137  
 7  0.0   31319.2   1.0X
-UNICODE_CI   17816  17829  
18  0.0  178162.4   0.2X
+UTF8_BINARY_LCASE 4705   4705  
 0  0.0   47048.0   1.0X
+UNICODE  18863  18867  
 6  0.0  188625.3   0.2X
+UTF8_BINARY   4894   4901  
11  0.0   48936.8   1.0X
+UNICODE_CI   19595  19598  
 4  0.0  195953.0   0.2X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  4824   4824 
  0  0.0   48243.7   1.0X
-UNICODE   69416  69475 
 84  0.0  694158.3   0.1X
-UTF8_BINARY3806   3808 
  2  0.0   38062.8   1.3X
-UNICODE_CI60943  60975 
 45  0.0  609426.2   0.1X
+UTF8_BINARY_LCASE  5011   5013 
  2  0.0   50113.1   1.0X
+UNICODE   68309  68319 
 13  0.0  683094.7   0.1X
+UTF8_BINARY3887   3887 
  1  0.0   38869.8   1.3X
+UNICODE_CI56675  56686 
 15  0.0  566750.3   0.1X
 
-OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 21.0.3+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 11979  11980 
  1  0.0  119790.4   1.0X
-UNICODE

Re: [PR] [SPARK-47353][SQL] Enable collation support for the Mode expression using GroupMapReduce [V2] [spark]

2024-05-17 Thread via GitHub


GideonPotok commented on code in PR #46597:
URL: https://github.com/apache/spark/pull/46597#discussion_r1604934461


##
sql/core/benchmarks/CollationBenchmark-results.txt:
##
@@ -1,54 +1,63 @@
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
--
-UTF8_BINARY_LCASE3571   3576   
7  0.0   35708.8   1.0X
-UNICODE  2235   2240   
7  0.0   22349.2   1.6X
-UTF8_BINARY  2237   2242   
6  0.0   22371.7   1.6X
-UNICODE_CI  18733  18817   
  118  0.0  187333.8   0.2X
+UTF8_BINARY_LCASE3241   3252   
   16  0.0   32413.8   1.0X
+UNICODE  2080   2082   
3  0.0   20800.9   1.6X
+UTF8_BINARY  2081   2083   
2  0.0   20814.2   1.6X
+UNICODE_CI  17364  17384   
   27  0.0  173644.2   0.2X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-UTF8_BINARY_LCASE 4260   4290  
41  0.0   42602.6   1.0X
-UNICODE  19536  19624  
   124  0.0  195360.2   0.2X
-UTF8_BINARY   3582   3612  
43  0.0   35818.5   1.2X
-UNICODE_CI   20381  20454  
   103  0.0  203814.1   0.2X
+UTF8_BINARY_LCASE 3614   3615  
 1  0.0   36142.6   1.0X
+UNICODE  18575  18585  
15  0.0  185747.7   0.2X
+UTF8_BINARY   3311   3326  
21  0.0   33111.6   1.1X
+UNICODE_CI   19241  19249  
11  0.0  192409.4   0.2X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE  7347   7349 
  3  0.0   73467.1   1.0X
-UNICODE   73462  73608 
206  0.0  734623.2   0.1X
-UTF8_BINARY5775   5815 
 57  0.0   57746.0   1.3X
-UNICODE_CI57543  57619 
108  0.0  575425.2   0.1X
+UTF8_BINARY_LCASE  6928   6929 
  1  0.0   69276.9   1.0X
+UNICODE   65674  65693 
 27  0.0  656737.6   0.1X
+UTF8_BINARY5440   5457 
 23  0.0   54403.2   1.3X
+UNICODE_CI60549  60605 
 79  0.0  605488.5   0.1X
 
-OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1018-azure
+OpenJDK 64-Bit Server VM 17.0.11+9-LTS on Linux 6.5.0-1021-azure
 AMD EPYC 7763 64-Core Processor
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-UTF8_BINARY_LCASE 15415  15424 
 13  0.0  154147.1   1.0X
-UNICODE  

Re: [PR] [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance [spark]

2024-05-17 Thread via GitHub


cloud-fan closed pull request #46622: [SPARK-48312][SQL] Improve 
Alias.removeNonInheritableMetadata performance
URL: https://github.com/apache/spark/pull/46622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48312][SQL] Improve Alias.removeNonInheritableMetadata performance [spark]

2024-05-17 Thread via GitHub


cloud-fan commented on PR #46622:
URL: https://github.com/apache/spark/pull/46622#issuecomment-2117503274

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604917892


##
connector/connect/common/src/main/protobuf/spark/connect/types.proto:
##
@@ -101,7 +101,7 @@ message DataType {
 
   message String {
 uint32 type_variation_reference = 1;
-uint32 collation_id = 2;
+string collation = 2;

Review Comment:
   And it makes sense. Collation id should be internal spark concept. Outside 
of spark you should always use collation name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604916810


##
common/unsafe/src/test/scala/org/apache/spark/unsafe/types/CollationFactorySuite.scala:
##
@@ -152,4 +231,168 @@ class CollationFactorySuite extends AnyFunSuite with 
Matchers { // scalastyle:ig
   }
 })
   }
+
+  test("test collation caching") {
+Seq(
+  "UTF8_BINARY",
+  "UTF8_BINARY_LCASE",
+  "UTF8_BINARY_UCASE",
+  "UNICODE",
+  "UNICODE_LCASE",
+  "UNICODE_UCASE",
+  "UNICODE_CI",
+  "UNICODE_AI_CI",
+  "UNICODE_AI_CI_LCASE",
+  "UNICODE_AI_CI_UCASE"
+).foreach(collationId => {
+  val col1 = fetchCollation(collationId)
+  val col2 = fetchCollation(collationId)
+  assert(col1 eq col2) // reference equality
+})
+  }
+
+  test("collations with ICU non-root localization") {
+Seq(
+  // language only
+  "en",
+  "en_CS",
+  "en_CI",
+  "en_AS",
+  "en_AI",
+  "en_LCASE",
+  "en_UCASE",
+  // language + 3-letter country code
+  "en_USA",
+  "en_USA_CS",
+  "en_USA_CI",
+  "en_USA_AS",
+  "en_USA_AI",
+  "en_USA_LCASE",
+  "en_USA_UCASE",
+  // language + script code
+  "sr_Cyrl",
+  "sr_Cyrl_CS",
+  "sr_Cyrl_CI",
+  "sr_Cyrl_AS",
+  "sr_Cyrl_AI",
+  "sr_Cyrl_LCASE",
+  "sr_Cyrl_UCASE",
+  // language + script code + 3-letter country code
+  "sr_Cyrl_SRB",
+  "sr_Cyrl_SRB_CS",
+  "sr_Cyrl_SRB_CI",
+  "sr_Cyrl_SRB_AS",
+  "sr_Cyrl_SRB_AI",
+  "sr_Cyrl_SRB_LCASE",
+  "sr_Cyrl_SRB_UCASE"
+).foreach(collationICU => {
+  val col = fetchCollation(collationICU)
+  assert(col.collator.getLocale(ULocale.VALID_LOCALE) != ULocale.ROOT)
+})
+  }
+
+  test("invalid names of collations with ICU non-root localization") {
+Seq(
+  "en_US", // must use 3-letter country code
+  "enn",
+  "en_AAA",
+  "en_Something",
+  "en_Something_USA",
+  "en_CI_UNSPECIFIED",
+  "en_USA_UNSPECIFIED",
+  "en_USA_UNSPECIFIED_CI",
+  "en_INDETERMINATE",
+  "en_USA_INDETERMINATE",
+  "en_Latn_USA", // use en_USA instead
+  "en_Cyrl_USA",
+  "en_USA_AAA",
+  "sr_Cyrl_SRB_AAA"
+).foreach(collationName => {
+  val error = intercept[SparkException] {
+fetchCollation(collationName)
+  }
+
+  assert(error.getErrorClass === "COLLATION_INVALID_NAME")
+  assert(error.getMessageParameters.asScala === Map("collationName" -> 
collationName))
+})
+  }
+
+  test("collations name normalization for ICU non-root localization") {
+Seq(
+  ("en_USA", "en_USA"),
+  ("en_CS", "en"),
+  ("en_AS", "en"),
+  ("en_CS_AS", "en"),
+  ("en_AS_CS", "en"),
+  ("en_CI", "en_CI"),
+  ("en_AI", "en_AI"),
+  ("en_AI_CI", "en_CI_AI"),
+  ("en_USA_AI_CI", "en_USA_CI_AI"),
+  ("en_USA_LCASE_AI_CI", "en_USA_CI_AI_LCASE"),
+  ("en_USA_LCASE_CI_AI", "en_USA_CI_AI_LCASE"),
+  ("en_USA_AI_LCASE_CI", "en_USA_CI_AI_LCASE"),
+  ("en_USA_CI_LCASE_AI", "en_USA_CI_AI_LCASE"),
+  // randomized case
+  ("EN_USA", "en_USA"),
+  ("eN_usA_ci_uCASe_aI", "en_USA_CI_AI_UCASE"),
+  ("SR_CYRL", "sr_Cyrl"),
+  ("sr_cyrl_srb", "sr_Cyrl_SRB"),
+  ("sR_cYRl_sRb", "sr_Cyrl_SRB")
+).foreach {
+  case (name, normalized) =>
+val col = fetchCollation(name)
+assert(col.collationName == normalized)
+}
+  }
+
+  test("invalid collationId") {
+val badCollationIds = Seq(
+  -1, // user-defined collation range
+  1 << 31, // user-defined collation range
+  1 << 12, // utf8-binary mandatory zero bit 12 breach
+  1 << 13, // utf8-binary mandatory zero bit 13 breach
+  1 << 14, // utf8-binary mandatory zero bit 14 breach
+  1 << 15, // utf8-binary mandatory zero bit 15 breach
+  1 << 16, // utf8-binary mandatory zero bit 16 breach
+  1 << 17, // utf8-binary mandatory zero bit 17 breach
+  1 << 18, // utf8-binary mandatory zero bit 18 breach
+  1 << 19, // utf8-binary mandatory zero bit 19 breach
+  1 << 20, // utf8-binary mandatory zero bit 20 breach
+  1 << 23, // utf8-binary mandatory zero bit 23 breach
+  1 << 24, // utf8-binary mandatory zero bit 24 breach
+  1 << 25, // utf8-binary mandatory zero bit 25 breach
+  1 << 26, // utf8-binary mandatory zero bit 26 breach
+  (1 << 29) | (1 << 12), // ICU mandatory zero bit 12 breach
+  (1 << 29) | (1 << 13), // ICU mandatory zero bit 13 breach
+  (1 << 29) | (1 << 14), // ICU mandatory zero bit 14 breach
+  (1 << 29) | (1 << 15), // ICU mandatory zero bit 15 breach
+  (1 << 29) | (1 << 16), // ICU mandatory zero bit 16 breach
+  (1 << 29) | (1 << 17), // ICU mandatory zero bit 17 breach
+  (1 << 29) | (1 << 18), // ICU mandatory zero bit 18 breach
+  (1 << 29) | (1 << 19), // ICU mandatory zero bit 19 breach
+  (1 << 29) | (1 

[PR] [SPARK-48324][SQL] Codegen Support for `hll_sketch_estimate` and `hll_union` [spark]

2024-05-17 Thread via GitHub


panbingkun opened a new pull request, #46639:
URL: https://github.com/apache/spark/pull/46639

   ### What changes were proposed in this pull request?
   The PR adds `Codegen Support` for `hll_sketch_estimate` and `hll_union`.
   
   ### Why are the changes needed?
   Improve codegen coverage.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   - Add new UT.
   - Pass GA.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]

2024-05-17 Thread via GitHub


stefankandic commented on PR #46280:
URL: https://github.com/apache/spark/pull/46280#issuecomment-2117427722

   @cloud-fan all checks passing, can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-47818][CONNECT][FOLLOW-UP] Introduce plan cache in SparkConnectPlanner to improve performance of Analyze requests [spark]

2024-05-17 Thread via GitHub


xi-db opened a new pull request, #46638:
URL: https://github.com/apache/spark/pull/46638

   ### What changes were proposed in this pull request?
   
   In [this previous PR](https://github.com/apache/spark/pull/46012), we 
introduced two new confs for the introduced plan cache - a static conf 
`spark.connect.session.planCache.maxSize` and a dynamic conf 
`spark.connect.session.planCache.enabled`. The plan cache is enabled by default 
with size 5. In this PR, we are marking them as internal because we don't 
expect users to deal with it.
   
   ### Why are the changes needed?
   
   This two confs are not expected to be used under normal circumstances, and 
we don't need to document them on the Spark Configuration reference page.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   
   ### How was this patch tested?
   
   Existing tests.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48238][BUILD][YARN] Replace AmIpFilter with re-implemented YarnAMIpFilter [spark]

2024-05-17 Thread via GitHub


pan3793 commented on PR #46611:
URL: https://github.com/apache/spark/pull/46611#issuecomment-2117258617

   cc @cloud-fan @dongjoon-hyun @LuciferYang @srowen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48323][SQL] DB2: Map BooleanType to BOOLEAN instead of CHAR(1) [spark]

2024-05-17 Thread via GitHub


yaooqinn opened a new pull request, #46637:
URL: https://github.com/apache/spark/pull/46637

   
   
   ### What changes were proposed in this pull request?
   
   
   This PR maps BooleanType to BOOLEAN instead of CHAR(1) when writing DB2 
tables, users can restore the old behavior by setting 
spark.sql.legacy.db2.booleanTypeMapping.enabled to true
   
   
   ### Why are the changes needed?
   
   DB2 has supported boolean since v9.7, which is already EOL. It's reasonable 
to BooleanType to BOOLEAN
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   yes, spark.sql.legacy.db2.booleanTypeMapping.enabled is provided to restore
   ### How was this patch tested?
   
   new tests
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl [spark]

2024-05-17 Thread via GitHub


zhengruifeng commented on PR #46635:
URL: https://github.com/apache/spark/pull/46635#issuecomment-2117212987

   merged to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48321][CONNECT][TESTS] Avoid using deprecated methods in dsl [spark]

2024-05-17 Thread via GitHub


zhengruifeng closed pull request #46635: [SPARK-48321][CONNECT][TESTS] Avoid 
using deprecated methods in dsl
URL: https://github.com/apache/spark/pull/46635


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


mihailom-db commented on PR #46474:
URL: https://github.com/apache/spark/pull/46474#issuecomment-2117197531

   So in summary SQL will be fine, but we need to resolve dataframe API as well 
and think of the best unified solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


nikolamand-db commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604679971


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -117,76 +119,445 @@ public Collation(
 }
 
 /**
- * Constructor with comparators that are inherited from the given collator.
+ * collation id (32-bit integer) layout:
+ * bit 31:0 = predefined collation, 1 = user-defined collation
+ * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec 
implementation)
+ * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive 
for ICU
+ * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = 
accent-insensitive for ICU
+ * bit 26-25: zeroes, reserved for punctuation sensitivity
+ * bit 24-23: zeroes, reserved for first letter preference
+ * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper
+ * bit 20-19: zeroes, reserved for space trimming
+ * bit 18-17: zeroes, reserved for version
+ * bit 16-12: zeroes
+ * bit 11-0:  zeroes for utf8-binary / locale id for ICU
  */
-public Collation(
-String collationName,
-Collator collator,
-String version,
-boolean supportsBinaryEquality,
-boolean supportsBinaryOrdering,
-boolean supportsLowercaseEquality) {
-  this(
-collationName,
-collator,
-(s1, s2) -> collator.compare(s1.toString(), s2.toString()),
-version,
-s -> (long)collator.getCollationKey(s.toString()).hashCode(),
-supportsBinaryEquality,
-supportsBinaryOrdering,
-supportsLowercaseEquality);
+private abstract static class CollationSpec {
+  protected enum ImplementationProvider {
+UTF8_BINARY, ICU, INDETERMINATE
+  }
+
+  protected enum CaseSensitivity {
+CS, CI
+  }
+
+  protected enum AccentSensitivity {
+AS, AI
+  }
+
+  protected enum CaseConversion {
+UNSPECIFIED, LCASE, UCASE
+  }
+
+  protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29;
+  protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11;
+  protected static final int CASE_SENSITIVITY_OFFSET = 28;
+  protected static final int CASE_SENSITIVITY_MASK = 0b1;
+  protected static final int ACCENT_SENSITIVITY_OFFSET = 27;
+  protected static final int ACCENT_SENSITIVITY_MASK = 0b1;
+  protected static final int CASE_CONVERSION_OFFSET = 21;
+  protected static final int CASE_CONVERSION_MASK = 0b11;
+  protected static final int LOCALE_OFFSET = 0;
+  protected static final int LOCALE_MASK = 0x0FFF;
+
+  protected static final int INDETERMINATE_COLLATION_ID =
+ImplementationProvider.INDETERMINATE.ordinal() << 
IMPLEMENTATION_PROVIDER_OFFSET;
+
+  protected final CaseSensitivity caseSensitivity;
+  protected final AccentSensitivity accentSensitivity;
+  protected final CaseConversion caseConversion;
+  protected final String locale;
+  protected final int collationId;
+
+  protected CollationSpec(
+  String locale,
+  CaseSensitivity caseSensitivity,
+  AccentSensitivity accentSensitivity,
+  CaseConversion caseConversion) {
+this.locale = locale;
+this.caseSensitivity = caseSensitivity;
+this.accentSensitivity = accentSensitivity;
+this.caseConversion = caseConversion;
+this.collationId = getCollationId();
+  }
+
+  private static final Map collationMap = new 
ConcurrentHashMap<>();
+
+  public static Collation fetchCollation(int collationId) throws 
SparkException {
+if (collationId == UTF8_BINARY_COLLATION_ID) {
+  return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION;
+} else if (collationMap.containsKey(collationId)) {
+  return collationMap.get(collationId);
+} else {
+  CollationSpec spec;
+  int implementationProviderOrdinal =
+(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & 
IMPLEMENTATION_PROVIDER_MASK;
+  if (implementationProviderOrdinal >= 
ImplementationProvider.values().length) {
+throw SparkException.internalError("Invalid collation 
implementation provider");
+  } else {
+ImplementationProvider implementationProvider = 
ImplementationProvider.values()[
+  implementationProviderOrdinal];
+if (implementationProvider == ImplementationProvider.UTF8_BINARY) {
+  spec = CollationSpecUTF8Binary.fromCollationId(collationId);
+} else if (implementationProvider == ImplementationProvider.ICU) {
+  spec = CollationSpecICU.fromCollationId(collationId);
+} else {
+  throw SparkException.internalError("Cannot instantiate 
indeterminate collation");
+}
+Collation collation = spec.buildCollation();
+   

Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604678110


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -245,29 +599,26 @@ public static StringSearch getStringSearch(
* Returns the collation id for the given collation name.
*/
   public static int collationNameToId(String collationName) throws 
SparkException {
-String normalizedName = collationName.toUpperCase();
-if (collationNameToIdMap.containsKey(normalizedName)) {
-  return collationNameToIdMap.get(normalizedName);
-} else {
-  Collation suggestion = Collections.min(List.of(collationTable), 
Comparator.comparingInt(
-c -> UTF8String.fromString(c.collationName).levenshteinDistance(
-  UTF8String.fromString(normalizedName;
-
-  Map params = new HashMap<>();
-  params.put("collationName", collationName);
-  params.put("proposal", suggestion.collationName);
-
-  throw new SparkException(
-"COLLATION_INVALID_NAME", 
SparkException.constructMessageParams(params), null);
-}
+return Collation.CollationSpec.collationNameToId(collationName);
+  }
+
+  public static Collation fetchCollationUnsafe(int collationId) throws 
SparkException {
+return Collation.CollationSpec.fetchCollation(collationId);
   }
 
   public static Collation fetchCollation(int collationId) {
-return collationTable[collationId];
+try {
+  return fetchCollationUnsafe(collationId);
+} catch (SparkException e) {
+  return Collation.CollationSpecUTF8Binary.UTF8_BINARY_COLLATION;
+}

Review Comment:
   TBH, I don't like this either. Swallowing generic exception and returning 
default seems like a big red flag to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


mihailom-db commented on PR #46474:
URL: https://github.com/apache/spark/pull/46474#issuecomment-2117193264

   I would expect that behaviour, but the problem comes with this rewriting you 
suggested. When we create expressions, CAST expression does not remember 
identifier used during parsing, but only type that that identifier produced. I 
managed to block SQL behaviour with the parsing rules, but pyspark, spark 
connect and dataframe API pose a problem. They allow for casting with types 
like StringType("collation_name") and StringType() and in python, as opposed to 
scala, we cannot differentiate StringType() and StringType("UTF8_BINARY") and 
protobuf always sends information as StringType("UTF8_BINARY"). Even if we only 
block SQL syntax problem arises if someone uses dataframe api because we can 
have session-level default collation changed and then collation resolution 
rules would be impossible to resolve, as that cast would be translated to 
cast(expression, StringType(session_level_default_collation)) and we have no 
way of differentiating whether this was created by STRING identifier or STR
 ING COLLATE session_level_default_collation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604672125


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -117,76 +119,445 @@ public Collation(
 }
 
 /**
- * Constructor with comparators that are inherited from the given collator.
+ * collation id (32-bit integer) layout:
+ * bit 31:0 = predefined collation, 1 = user-defined collation
+ * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec 
implementation)
+ * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive 
for ICU
+ * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = 
accent-insensitive for ICU
+ * bit 26-25: zeroes, reserved for punctuation sensitivity
+ * bit 24-23: zeroes, reserved for first letter preference
+ * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper
+ * bit 20-19: zeroes, reserved for space trimming
+ * bit 18-17: zeroes, reserved for version
+ * bit 16-12: zeroes
+ * bit 11-0:  zeroes for utf8-binary / locale id for ICU
  */
-public Collation(
-String collationName,
-Collator collator,
-String version,
-boolean supportsBinaryEquality,
-boolean supportsBinaryOrdering,
-boolean supportsLowercaseEquality) {
-  this(
-collationName,
-collator,
-(s1, s2) -> collator.compare(s1.toString(), s2.toString()),
-version,
-s -> (long)collator.getCollationKey(s.toString()).hashCode(),
-supportsBinaryEquality,
-supportsBinaryOrdering,
-supportsLowercaseEquality);
+private abstract static class CollationSpec {
+  protected enum ImplementationProvider {
+UTF8_BINARY, ICU, INDETERMINATE
+  }
+
+  protected enum CaseSensitivity {
+CS, CI
+  }
+
+  protected enum AccentSensitivity {
+AS, AI
+  }
+
+  protected enum CaseConversion {
+UNSPECIFIED, LCASE, UCASE
+  }
+
+  protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29;
+  protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11;
+  protected static final int CASE_SENSITIVITY_OFFSET = 28;
+  protected static final int CASE_SENSITIVITY_MASK = 0b1;
+  protected static final int ACCENT_SENSITIVITY_OFFSET = 27;
+  protected static final int ACCENT_SENSITIVITY_MASK = 0b1;
+  protected static final int CASE_CONVERSION_OFFSET = 21;
+  protected static final int CASE_CONVERSION_MASK = 0b11;
+  protected static final int LOCALE_OFFSET = 0;
+  protected static final int LOCALE_MASK = 0x0FFF;
+
+  protected static final int INDETERMINATE_COLLATION_ID =
+ImplementationProvider.INDETERMINATE.ordinal() << 
IMPLEMENTATION_PROVIDER_OFFSET;
+
+  protected final CaseSensitivity caseSensitivity;
+  protected final AccentSensitivity accentSensitivity;
+  protected final CaseConversion caseConversion;
+  protected final String locale;
+  protected final int collationId;
+
+  protected CollationSpec(
+  String locale,
+  CaseSensitivity caseSensitivity,
+  AccentSensitivity accentSensitivity,
+  CaseConversion caseConversion) {
+this.locale = locale;
+this.caseSensitivity = caseSensitivity;
+this.accentSensitivity = accentSensitivity;
+this.caseConversion = caseConversion;
+this.collationId = getCollationId();
+  }
+
+  private static final Map collationMap = new 
ConcurrentHashMap<>();
+
+  public static Collation fetchCollation(int collationId) throws 
SparkException {
+if (collationId == UTF8_BINARY_COLLATION_ID) {
+  return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION;
+} else if (collationMap.containsKey(collationId)) {
+  return collationMap.get(collationId);
+} else {
+  CollationSpec spec;
+  int implementationProviderOrdinal =
+(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & 
IMPLEMENTATION_PROVIDER_MASK;
+  if (implementationProviderOrdinal >= 
ImplementationProvider.values().length) {
+throw SparkException.internalError("Invalid collation 
implementation provider");
+  } else {
+ImplementationProvider implementationProvider = 
ImplementationProvider.values()[
+  implementationProviderOrdinal];
+if (implementationProvider == ImplementationProvider.UTF8_BINARY) {
+  spec = CollationSpecUTF8Binary.fromCollationId(collationId);
+} else if (implementationProvider == ImplementationProvider.ICU) {
+  spec = CollationSpecICU.fromCollationId(collationId);
+} else {
+  throw SparkException.internalError("Cannot instantiate 
indeterminate collation");
+}
+Collation collation = spec.buildCollation();
+

Re: [PR] [SPARK-46841][SQL] Add collation support for ICU locales and collation specifiers [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46180:
URL: https://github.com/apache/spark/pull/46180#discussion_r1604667645


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -117,76 +119,445 @@ public Collation(
 }
 
 /**
- * Constructor with comparators that are inherited from the given collator.
+ * collation id (32-bit integer) layout:
+ * bit 31:0 = predefined collation, 1 = user-defined collation
+ * bit 30-29: 00 = utf8-binary, 01 = ICU, 10 = indeterminate (without spec 
implementation)
+ * bit 28:0 for utf8-binary / 0 = case-sensitive, 1 = case-insensitive 
for ICU
+ * bit 27:0 for utf8-binary / 0 = accent-sensitive, 1 = 
accent-insensitive for ICU
+ * bit 26-25: zeroes, reserved for punctuation sensitivity
+ * bit 24-23: zeroes, reserved for first letter preference
+ * bit 22-21: 00 = unspecified, 01 = to-lower, 10 = to-upper
+ * bit 20-19: zeroes, reserved for space trimming
+ * bit 18-17: zeroes, reserved for version
+ * bit 16-12: zeroes
+ * bit 11-0:  zeroes for utf8-binary / locale id for ICU
  */
-public Collation(
-String collationName,
-Collator collator,
-String version,
-boolean supportsBinaryEquality,
-boolean supportsBinaryOrdering,
-boolean supportsLowercaseEquality) {
-  this(
-collationName,
-collator,
-(s1, s2) -> collator.compare(s1.toString(), s2.toString()),
-version,
-s -> (long)collator.getCollationKey(s.toString()).hashCode(),
-supportsBinaryEquality,
-supportsBinaryOrdering,
-supportsLowercaseEquality);
+private abstract static class CollationSpec {
+  protected enum ImplementationProvider {
+UTF8_BINARY, ICU, INDETERMINATE
+  }
+
+  protected enum CaseSensitivity {
+CS, CI
+  }
+
+  protected enum AccentSensitivity {
+AS, AI
+  }
+
+  protected enum CaseConversion {
+UNSPECIFIED, LCASE, UCASE
+  }
+
+  protected static final int IMPLEMENTATION_PROVIDER_OFFSET = 29;
+  protected static final int IMPLEMENTATION_PROVIDER_MASK = 0b11;
+  protected static final int CASE_SENSITIVITY_OFFSET = 28;
+  protected static final int CASE_SENSITIVITY_MASK = 0b1;
+  protected static final int ACCENT_SENSITIVITY_OFFSET = 27;
+  protected static final int ACCENT_SENSITIVITY_MASK = 0b1;
+  protected static final int CASE_CONVERSION_OFFSET = 21;
+  protected static final int CASE_CONVERSION_MASK = 0b11;
+  protected static final int LOCALE_OFFSET = 0;
+  protected static final int LOCALE_MASK = 0x0FFF;
+
+  protected static final int INDETERMINATE_COLLATION_ID =
+ImplementationProvider.INDETERMINATE.ordinal() << 
IMPLEMENTATION_PROVIDER_OFFSET;
+
+  protected final CaseSensitivity caseSensitivity;
+  protected final AccentSensitivity accentSensitivity;
+  protected final CaseConversion caseConversion;
+  protected final String locale;
+  protected final int collationId;
+
+  protected CollationSpec(
+  String locale,
+  CaseSensitivity caseSensitivity,
+  AccentSensitivity accentSensitivity,
+  CaseConversion caseConversion) {
+this.locale = locale;
+this.caseSensitivity = caseSensitivity;
+this.accentSensitivity = accentSensitivity;
+this.caseConversion = caseConversion;
+this.collationId = getCollationId();
+  }
+
+  private static final Map collationMap = new 
ConcurrentHashMap<>();
+
+  public static Collation fetchCollation(int collationId) throws 
SparkException {
+if (collationId == UTF8_BINARY_COLLATION_ID) {
+  return CollationSpecUTF8Binary.UTF8_BINARY_COLLATION;
+} else if (collationMap.containsKey(collationId)) {
+  return collationMap.get(collationId);
+} else {
+  CollationSpec spec;
+  int implementationProviderOrdinal =
+(collationId >> IMPLEMENTATION_PROVIDER_OFFSET) & 
IMPLEMENTATION_PROVIDER_MASK;
+  if (implementationProviderOrdinal >= 
ImplementationProvider.values().length) {
+throw SparkException.internalError("Invalid collation 
implementation provider");
+  } else {
+ImplementationProvider implementationProvider = 
ImplementationProvider.values()[
+  implementationProviderOrdinal];
+if (implementationProvider == ImplementationProvider.UTF8_BINARY) {
+  spec = CollationSpecUTF8Binary.fromCollationId(collationId);
+} else if (implementationProvider == ImplementationProvider.ICU) {
+  spec = CollationSpecICU.fromCollationId(collationId);
+} else {
+  throw SparkException.internalError("Cannot instantiate 
indeterminate collation");
+}
+Collation collation = spec.buildCollation();
+

Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


srielau commented on PR #46474:
URL: https://github.com/apache/spark/pull/46474#issuecomment-2117161101

   > @stefankandic SQL Standard does not support for casting to collated 
strings. I agree with the message thing. The standard only allows for CAST(1 AS 
STRING) COLLATE UNICODE, so that is what we were trying to achieve with this 
restriction on cast expression. The problem is that we have Spark Connect which 
can call col.cast(StringType(collation_name)) or col.cast("STRING COLLATE 
UNICODE") and we do not have an easy way of differentiating StringType() from 
type created like StringType(UTF8_BINARY). In order to preserve meaning of cast 
to STRING as default, I would need to add this flag. Another solution is to 
always treat user defined casts as implicit priority. @srielau what do you 
think of this type of behaviour for user defined casts?
   
   Let me see if I get this right:
   If we wanted to support CAST( .. AS STRING COLLATE...) this would naturally 
result in an collation with IMPLICIT priority.
   (A CAST (.. AS STRING) would be DEFAULT, right?
   The alternative would be to (semantically) rewrite it to CAST( .. AS STRING) 
COLLATE , and treat it as EXPLICIT.
   
   
   Right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]

2024-05-17 Thread via GitHub


stefankandic commented on code in PR #46280:
URL: https://github.com/apache/spark/pull/46280#discussion_r1604650741


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -36,11 +36,45 @@
  * Provides functionality to the UTF8String object which respects defined 
collation settings.
  */
 public final class CollationFactory {
+
+  /**
+   * Identifier for single a collation.
+   */
+  public static class CollationIdentifier {
+public final String provider;
+public final String name;
+public final String version;

Review Comment:
   that makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE [spark]

2024-05-17 Thread via GitHub


stefankandic commented on code in PR #46280:
URL: https://github.com/apache/spark/pull/46280#discussion_r1604650376


##
common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java:
##
@@ -36,11 +36,57 @@
  * Provides functionality to the UTF8String object which respects defined 
collation settings.
  */
 public final class CollationFactory {
+
+  /**
+   * Identifier for single a collation.
+   */
+  public static class CollationIdentifier {
+public final String provider;
+public final String name;
+public final String version;
+
+public CollationIdentifier(String provider, String collationName, String 
version) {
+  this.provider = provider;
+  this.name = collationName;
+  this.version = version;
+}
+
+public static CollationIdentifier fromString(String identifier) {
+  long numDots = identifier.chars().filter(ch -> ch == '.').count();
+  assert(numDots > 0);
+
+  if (numDots == 1) {
+String[] parts = identifier.split("\\.", 2);
+return new CollationIdentifier(parts[0], parts[1], null);
+  }
+
+  String[] parts = identifier.split("\\.", 3);
+  return new CollationIdentifier(parts[0], parts[1], parts[2]);
+}
+
+@Override
+public String toString() {
+  if (version != null) {
+return String.format("%s.%s.%s", provider, name, version);
+  }
+
+  return toStringWithoutVersion();
+}
+
+/**
+ * Returns the identifier's string value without the version.
+ */
+public String toStringWithoutVersion() {

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1604648152


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -768,7 +768,7 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
 })
   }
 
-  test("hash based joins not allowed for non-binary collated strings") {
+  test("hash based joins are also allowed for non-binary collated strings") {

Review Comment:
   Feel free to remove this test if `hash join should be used for collated 
strings` is enough.
   
   In tests we shouldn't care about history - i.e. it is not important that 
once upon a time we didn't support hash joins and now we do. Tests should just 
assert that current behaviour is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1604643347


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CollationKey.scala:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
+import org.apache.spark.sql.catalyst.util.CollationFactory
+import org.apache.spark.sql.internal.types.StringTypeAnyCollation
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class CollationKey(expr: Expression) extends UnaryExpression with 
ExpectsInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(StringTypeAnyCollation)
+  override def dataType: DataType = BinaryType
+
+  final lazy val collationId: Int = expr.dataType match {
+case st: StringType =>
+  st.collationId
+  }
+
+  override def nullSafeEval(input: Any): Any = input match {
+case str: UTF8String =>
+  CollationFactory.getCollationKeyBytes(str, collationId)
+case _ =>

Review Comment:
   I think that it doesn't make sense to ever return None here.
   
   You have guarantee that input is of a string type. You can just do explicit 
cast:
   `CollationFactory.getCollationKeyBytes(input.asInstanceOf[UTF8String], 
collationId)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


mihailom-db commented on PR #46474:
URL: https://github.com/apache/spark/pull/46474#issuecomment-2117149071

   @stefankandic SQL Standard does not support for casting to collated strings. 
I agree with the message thing. The standard only allows for CAST(1 AS STRING) 
COLLATE UNICODE, so that is what we were trying to achieve with this 
restriction on cast expression. The problem is that we have Spark Connect which 
can call col.cast(StringType(collation_name)) or col.cast("STRING COLLATE 
UNICODE") and we do not have an easy way of differentiating  StringType() from 
type created like StringType(UTF8_BINARY). In order to preserve meaning of cast 
to STRING as default, I would need to add this flag. Another solution is to 
always treat user defined casts as implicit priority. @srielau what do you 
think of this type of behaviour for user defined casts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48000][SQL] Enable hash join support for all collations (StringType) [spark]

2024-05-17 Thread via GitHub


dbatomic commented on code in PR #46599:
URL: https://github.com/apache/spark/pull/46599#discussion_r1604640371


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala:
##
@@ -778,25 +778,25 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
 val df1 = spark.createDataFrame(sparkContext.parallelize(in), schema)
 
 // Binary collations are allowed to use hash join.
-assert(collectFirst(
-  df1.hint("broadcast").join(df1, df1("col_binary") === df1("col_binary"))
-.queryExecution.executedPlan) {
+val bJoin = df1.hint("broadcast").join(df1, df1("col_binary") === 
df1("col_binary"))
+val binaryJoinPlan = bJoin.queryExecution.executedPlan
+assert(collectFirst(binaryJoinPlan) {
   case _: BroadcastHashJoinExec => ()
 }.nonEmpty)
 
-// Even with hint broadcast, hash join is not used for non-binary collated 
strings.
-assert(collectFirst(
-  df1.hint("broadcast").join(df1, df1("col_non_binary") === 
df1("col_non_binary"))
-.queryExecution.executedPlan) {
+// Hash join is also used for non-binary collated strings.
+val nbJoin = df1.hint("broadcast").join(df1, df1("col_non_binary") === 
df1("col_non_binary"))
+val non_binaryJoinPlan = nbJoin.queryExecution.executedPlan
+assert(collectFirst(non_binaryJoinPlan) {
   case _: BroadcastHashJoinExec => ()
-}.isEmpty)
+}.nonEmpty)
+// This is possible because CollationKey is injected into the plan.
+assert(non_binaryJoinPlan.toString().contains("collationkey"))

Review Comment:
   Can you explicitly do the check instead of toString + contains?
   
   e.g. once you get into `BroadcastHashJoinExec` you should be able to get the 
key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


stefankandic commented on code in PR #46474:
URL: https://github.com/apache/spark/pull/46474#discussion_r1604633605


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/parser/DataTypeAstBuilder.scala:
##
@@ -74,7 +74,10 @@ class DataTypeAstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] {
   case (TIMESTAMP_LTZ, Nil) => TimestampType
   case (STRING, Nil) =>
 typeCtx.children.asScala.toSeq match {
-  case Seq(_) => SqlApiConf.get.defaultStringType
+  case Seq(_) =>
+val st = SqlApiConf.get.defaultStringType
+st.createdAsNonCollated = true

Review Comment:
   can we avoid mutating the state of the defaultStringType?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48322][SQL][CONNECT][PYTHON] Drop internal metadata in `DataFrame.schema` [spark]

2024-05-17 Thread via GitHub


zhengruifeng opened a new pull request, #46636:
URL: https://github.com/apache/spark/pull/46636

   ### What changes were proposed in this pull request?
   Drop internal metadata in `DataFrame.schema`
   
   
   ### Why are the changes needed?
   Internal metadata might be leaked in both Spark Connect and Spark Classic,
   
   e.g. in Spark Classic
   ```
   In [9]: spark.range(10).select(sf.lit(1).alias("key"), 
"id").groupBy("key").agg(sf.max("id")).schema.json()
   Out[9]: 
'{"fields":[{"metadata":{},"name":"key","nullable":false,"type":"integer"},{"metadata":{"__autoGeneratedAlias":"true"},"name":"max(id)","nullable":true,"type":"long"}],"type":"struct"}'
   ```
   
   What make it worse is that internal metadata maybe leaked in different 
cases, so need to add additional `_drop_meta` in Pandas APIs to make assertions 
work.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   CI
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46618:
URL: https://github.com/apache/spark/pull/46618#discussion_r1604620097


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("CurrentTimeZone expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select current_timezone()"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("DayName expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select dayname(current_date())"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("ToUnixTimestamp expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select to_unix_timestamp(collate('2021-01-01 00:00:00', 
'${collationName}'),
+  |collate('-MM-dd HH:mm:ss', '${collationName}'))
+  |""".stripMargin
+  // Result
+  val testQuery = sql(query)
+  val dataType = LongType
+  val expectedResult = 1609488000L
+  assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  assertResult(expectedResult)(testQuery.collect().head.getLong(0))
+})
+  }
+
+  test("FromUnixTime expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select from_unixtime(1609488000, collate('-MM-dd HH:mm:ss', 
'${collationName}'))
+  |""".stripMargin
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+val expectedResult = "2021-01-01 00:00:00"
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+assertResult(expectedResult)(testQuery.collect().head.getString(0))
+  }
+})
+  }
+
+  test("NextDay expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select next_day('2015-01-14', collate('TU', '${collationName}'))
+  |""".stripMargin
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = DateType
+val expectedResult = "2015-01-20"
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+
assertResult(expectedResult)(testQuery.collect().head.getDate(0).toString)
+  }
+})
+  }
+
+  test("FromUTCTimestamp expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select from_utc_timestamp(collate('2016-08-31', '${collationName}'),
+  |collate('Asia/Seoul', '${collationName}'))
+  |""".stripMargin
+  // Result
+  val testQuery = sql(query)
+  val dataType = TimestampType
+  val expectedResult = "2016-08-31 09:00:00.0"
+  assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  
assertResult(expectedResult)(testQuery.collect().head.getTimestamp(0).toString)
+})
+  }
+
+  test("ToUTCTimestamp expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select to_utc_timestamp(collate('2016-08-31 09:00:00', 
'${collationName}'),
+  |collate('Asia/Seoul', '${collationName}'))
+  |""".stripMargin
+  // Result
+  val testQuery = sql(query)
+  val dataType = TimestampType
+  val expectedResult = "2016-08-31 00:00:00.0"
+  assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  
assertResult(expectedResult)(testQuery.collect().head.getTimestamp(0).toString)
+})
+  }
+
+  test("ParseToDate expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select 

Re: [PR] [SPARK-47972][SQL] Restrict CAST expression for collations [spark]

2024-05-17 Thread via GitHub


stefankandic commented on PR #46474:
URL: https://github.com/apache/spark/pull/46474#issuecomment-2117114028

   Why do we want to do this? Should it be a parser error? Also, I don't think 
the error message is very clear - we should at least let the user know how to 
perform the cast to another collation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46618:
URL: https://github.com/apache/spark/pull/46618#discussion_r1604614789


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("CurrentTimeZone expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {

Review Comment:
   since we're now using this `Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", 
"UNICODE", "UNICODE_CI")` a lot, let's separate it out and call it something 
like `testCollationsSeq`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46618:
URL: https://github.com/apache/spark/pull/46618#discussion_r1604612518


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("CurrentTimeZone expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select current_timezone()"
+  // Result

Review Comment:
   (goes for other similar tests too)



##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("CurrentTimeZone expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select current_timezone()"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("DayName expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select dayname(current_date())"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("ToUnixTimestamp expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select to_unix_timestamp(collate('2021-01-01 00:00:00', 
'${collationName}'),
+  |collate('-MM-dd HH:mm:ss', '${collationName}'))
+  |""".stripMargin
+  // Result
+  val testQuery = sql(query)
+  val dataType = LongType
+  val expectedResult = 1609488000L
+  assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  assertResult(expectedResult)(testQuery.collect().head.getLong(0))

Review Comment:
   (goes for other similar tests too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48159][SQL] Extending support for collated strings on datetime expressions [spark]

2024-05-17 Thread via GitHub


uros-db commented on code in PR #46618:
URL: https://github.com/apache/spark/pull/46618#discussion_r1604611590


##
sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala:
##
@@ -1584,6 +1584,234 @@ class CollationSQLExpressionsSuite
 })
   }
 
+  test("CurrentTimeZone expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select current_timezone()"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("DayName expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query = "select dayname(current_date())"
+  // Result
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> collationName) {
+val testQuery = sql(query)
+val dataType = StringType(collationName)
+assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  }
+})
+  }
+
+  test("ToUnixTimestamp expression with collation") {
+// Supported collations
+Seq("UTF8_BINARY", "UTF8_BINARY_LCASE", "UNICODE", 
"UNICODE_CI").foreach(collationName => {
+  val query =
+s"""
+  |select to_unix_timestamp(collate('2021-01-01 00:00:00', 
'${collationName}'),
+  |collate('-MM-dd HH:mm:ss', '${collationName}'))
+  |""".stripMargin
+  // Result
+  val testQuery = sql(query)
+  val dataType = LongType
+  val expectedResult = 1609488000L
+  assertResult(dataType)(testQuery.schema.fields.head.dataType)
+  assertResult(expectedResult)(testQuery.collect().head.getLong(0))

Review Comment:
   use `checkAnswer` instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >