[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
pralabhkumar commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888632502 ## python/pyspark/tests/test_shuffle.py: ## @@ -54,6 +63,49 @@ def test_medium_dataset(self): self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) * 3) +def test_shuffle_data_with_multiple_locations(self): +# SPARK-39179: Test shuffle of data with multiple location also check +# shuffle locations get randomized + +with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: +os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 +index_of_tempdir1 = [False, False] +for idx in range(10): +m = ExternalMerger(self.agg, 20) +if m.localdirs[0].startswith(tempdir1): +index_of_tempdir1[0] = True +elif m.localdirs[1].startswith(tempdir1): +index_of_tempdir1[1] = True +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N))) +self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == index_of_tempdir1[1])) +del os.environ["SPARK_LOCAL_DIRS"] Review Comment: @HyukjinKwon , sorry didn't understand , your suggestion completely . Please help . Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no default value) . So as part of this test it was set , once test case completed , it was removed at the end of the test case, so that rest of the cases work as is . Let me know , if i am missing anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t
mridulm commented on code in PR #36734: URL: https://github.com/apache/spark/pull/36734#discussion_r888654676 ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if there is a FetchFailed event +// and is not a MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null + && bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) { Review Comment: nit: Move the `&&` to the previous line. ## core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala: ## @@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler( mapOutputTracker. unregisterMergeResult(shuffleId, reduceId, bmAddress, Option(mapIndex)) } + } else { +// Unregister the merge result of if there is a FetchFailed event +// and is not a MetaDataFetchException which is signified by bmAddress being null +if (bmAddress != null + && bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) { + assert(pushBasedShuffleEnabled, "Pushed based shuffle needs to " + +"be enabled so that merge results are present.") Review Comment: `Pushed based shuffle` -> `Push based shuffle` Change message to something like `Push based shuffle expected to be enabled when handling merge block fetch failure` ? To make it clear what we are expecting the condition to be for. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate Function: regr_intercept
MaxGekk closed pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate Function: regr_intercept URL: https://github.com/apache/spark/pull/36708 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate Function: regr_intercept
MaxGekk commented on PR #36708: URL: https://github.com/apache/spark/pull/36708#issuecomment-1145630828 +1, LGTM. Merging to master. Thank you, @beliefer and @cloud-fan for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps consistently in subqueries
MaxGekk closed pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps consistently in subqueries URL: https://github.com/apache/spark/pull/36752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps consistently in subqueries
MaxGekk commented on PR #36752: URL: https://github.com/apache/spark/pull/36752#issuecomment-1145620433 +1, LGTM. Merging to 3.3. Thank you, @olaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xuanyuanking opened a new pull request, #36757: [SPARK-39371][DOCS][Core] Review and fix issues in Scala/Java API docs of Core module #36754
xuanyuanking opened a new pull request, #36757: URL: https://github.com/apache/spark/pull/36757 ### What changes were proposed in this pull request? Compare the 3.3.0 API doc with the latest release version 3.2.1. Fix the following issues: * Add missing Since annotation for new APIs * Remove the leaking class/object in API doc ### Why are the changes needed? Improve API docs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
pralabhkumar commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888632502 ## python/pyspark/tests/test_shuffle.py: ## @@ -54,6 +63,49 @@ def test_medium_dataset(self): self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) * 3) +def test_shuffle_data_with_multiple_locations(self): +# SPARK-39179: Test shuffle of data with multiple location also check +# shuffle locations get randomized + +with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: +os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 +index_of_tempdir1 = [False, False] +for idx in range(10): +m = ExternalMerger(self.agg, 20) +if m.localdirs[0].startswith(tempdir1): +index_of_tempdir1[0] = True +elif m.localdirs[1].startswith(tempdir1): +index_of_tempdir1[1] = True +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N))) +self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == index_of_tempdir1[1])) +del os.environ["SPARK_LOCAL_DIRS"] Review Comment: @HyukjinKwon , sorry didn't understand , your suggestion completely . Please help . Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no default value) . So as part of this test it was set , once test case completed , it was removed at the end of the test case, so that rest of the cases work as is . Let me know , if i am missing anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
pralabhkumar commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888632502 ## python/pyspark/tests/test_shuffle.py: ## @@ -54,6 +63,49 @@ def test_medium_dataset(self): self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) * 3) +def test_shuffle_data_with_multiple_locations(self): +# SPARK-39179: Test shuffle of data with multiple location also check +# shuffle locations get randomized + +with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: +os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 +index_of_tempdir1 = [False, False] +for idx in range(10): +m = ExternalMerger(self.agg, 20) +if m.localdirs[0].startswith(tempdir1): +index_of_tempdir1[0] = True +elif m.localdirs[1].startswith(tempdir1): +index_of_tempdir1[1] = True +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N))) +self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == index_of_tempdir1[1])) +del os.environ["SPARK_LOCAL_DIRS"] Review Comment: @HyukjinKwon , sorry didn't understand it completely . Please help . Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no default value) . So as part of this test it was set , once test case completed , it was removed at the end of the test case, so that rest of the cases work as is . Let me know , if i am missing anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
pralabhkumar commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888632502 ## python/pyspark/tests/test_shuffle.py: ## @@ -54,6 +63,49 @@ def test_medium_dataset(self): self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) * 3) +def test_shuffle_data_with_multiple_locations(self): +# SPARK-39179: Test shuffle of data with multiple location also check +# shuffle locations get randomized + +with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: +os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 +index_of_tempdir1 = [False, False] +for idx in range(10): +m = ExternalMerger(self.agg, 20) +if m.localdirs[0].startswith(tempdir1): +index_of_tempdir1[0] = True +elif m.localdirs[1].startswith(tempdir1): +index_of_tempdir1[1] = True +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N))) +self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == index_of_tempdir1[1])) +del os.environ["SPARK_LOCAL_DIRS"] Review Comment: @HyukjinKwon , sorry didn't understand it completely . Please help . Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no default value) . So as part of this test it was set , once test case completed , it was removed at the end of the test case, so that rest of the cases work as is . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #36726: [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source
sadikovi commented on code in PR #36726: URL: https://github.com/apache/spark/pull/36726#discussion_r887638039 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala: ## @@ -150,6 +150,9 @@ object JdbcUtils extends Logging with SQLConfHelper { case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) + // Most of the databases either don't support TIMESTAMP WITHOUT TIME ZONE or map it to + // TIMESTAMP type. This will be overwritten in dialects. + case TimestampNTZType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) Review Comment: This is a common use case of treating TIMESTAMP as timestamp without time zone. JDBC dialects can override this setting if need be. For example, SQL Server uses DATETIME instead. I have verified that most of the jdbc data sources work fine with TIMESTAMP. I am going to update the comment to elaborate in more detail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`
MaxGekk closed pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN` URL: https://github.com/apache/spark/pull/36714 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #36726: [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source
sadikovi commented on code in PR #36726: URL: https://github.com/apache/spark/pull/36726#discussion_r887638039 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala: ## @@ -150,6 +150,9 @@ object JdbcUtils extends Logging with SQLConfHelper { case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB)) case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) + // Most of the databases either don't support TIMESTAMP WITHOUT TIME ZONE or map it to + // TIMESTAMP type. This will be overwritten in dialects. + case TimestampNTZType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP)) Review Comment: This is a common use case of treating TIMESTAMP as timestamp without time zone. JDBC dialects can override this setting if need be. For example, SQL Server uses DATETIME instead. I have verified that most of the jdbc data sources work fine with TIMESTAMP. I left a comment above to explain. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36740: [SPARK-39355][SQL] Avoid UnresolvedAttribute.apply throwing ParseException
AmplabJenkins commented on PR #36740: URL: https://github.com/apache/spark/pull/36740#issuecomment-1145590291 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36741: [SPARK-39357][SQL] Fix pmCache memory leak caused by IsolatedClassLoader
AmplabJenkins commented on PR #36741: URL: https://github.com/apache/spark/pull/36741#issuecomment-1145590268 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
AmplabJenkins commented on PR #36745: URL: https://github.com/apache/spark/pull/36745#issuecomment-1145590247 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
sadikovi commented on code in PR #36745: URL: https://github.com/apache/spark/pull/36745#discussion_r888606358 ## sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala: ## @@ -41,21 +41,28 @@ import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} * * We can remove this rule once we implement all the catalog functionality in `V2SessionCatalog`. */ -class ResolveSessionCatalog(val catalogManager: CatalogManager) +class ResolveSessionCatalog(val analyzer: Analyzer) extends Rule[LogicalPlan] with LookupCatalog { + val catalogManager = analyzer.catalogManager Review Comment: It is up to you to change if you like, but maybe we could move this line below imports ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala: ## @@ -83,16 +83,26 @@ object ResolveDefaultColumns { * * @param analyzer used for analyzing the result of parsing the expression stored as text. * @param tableSchema represents the names and types of the columns of the statement to process. + * @param tableProvider provider of the target table to store default values for, if any. * @param statementType name of the statement being processed, such as INSERT; useful for errors. * @return a copy of `tableSchema` with field metadata updated with the constant-folded values. */ def constantFoldCurrentDefaultsToExistDefaults( analyzer: Analyzer, tableSchema: StructType, + tableProvider: Option[String], statementType: String): StructType = { if (SQLConf.get.enableDefaultColumns) { val newFields: Seq[StructField] = tableSchema.fields.map { field => if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) { + // Make sure that the target table has a provider that supports default column values. + val allowedProviders: Array[String] = +SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS) Review Comment: Maybe we could try to move the code out of the `map` loop? It is not a big deal but it could matter for large schemas. For example, we can prepare allowedProviders list and then call `map`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36660: [SPARK-39284][PS] Implement Groupby.mad
HyukjinKwon commented on code in PR #36660: URL: https://github.com/apache/spark/pull/36660#discussion_r888603854 ## python/pyspark/pandas/groupby.py: ## @@ -759,6 +759,99 @@ def skew(scol: Column) -> Column: bool_to_numeric=True, ) +# TODO: 'axis', 'skipna', 'level' parameter should be implemented. +def mad(self) -> FrameLike: +""" +Compute mean absolute deviation of groups, excluding missing values. + +.. versionadded:: 3.4.0 + +Examples + +>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], +..."C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) + +>>> df.groupby("A").mad() + B C +A +1 0.44 0.44 +2 0.00 0.00 + +>>> df.B.groupby(df.A).mad() +A +10.44 +20.00 +Name: B, dtype: float64 + +See Also + +pyspark.pandas.Series.groupby +pyspark.pandas.DataFrame.groupby +""" + +groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] +groupkey_scols = [s.alias(name) for s, name in zip(self._groupkeys_scols, groupkey_names)] + +agg_columns = [] +for psser in self._agg_columns: +if isinstance(psser.spark.data_type, BooleanType): +agg_columns.append(psser.astype(int)) +elif isinstance(psser.spark.data_type, NumericType): +agg_columns.append(psser) + +sdf = self._psdf._internal.spark_frame.select( +*groupkey_scols, *[psser.spark.column for psser in agg_columns] +) + +internal = InternalFrame( +spark_frame=sdf, +index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], +index_names=[psser._column_label for psser in self._groupkeys], +index_fields=[ +psser._internal.data_fields[0].copy(name=name) +for psser, name in zip(self._groupkeys, groupkey_names) +], +data_spark_columns=[ +scol_for(sdf, psser._internal.data_spark_column_names[0]) for psser in agg_columns +], +column_labels=[psser._column_label for psser in agg_columns], +data_fields=[psser._internal.data_fields[0] for psser in agg_columns], +column_label_names=self._psdf._internal.column_label_names, +) +psdf: DataFrame = DataFrame(internal) + +if len(psdf._internal.column_labels) > 0: Review Comment: Maybe lets add some comments here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sadikovi commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
sadikovi commented on code in PR #36745: URL: https://github.com/apache/spark/pull/36745#discussion_r888603145 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala: ## @@ -122,7 +122,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { } test("create table with default columns") { -withBasicCatalog { catalog => +if (!isHiveExternalCatalog) withBasicCatalog { catalog => Review Comment: What happens if we remove `if (!isHiveExternalCatalog)`? What is the test failure in this case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36660: [SPARK-39284][PS] Implement Groupby.mad
HyukjinKwon commented on code in PR #36660: URL: https://github.com/apache/spark/pull/36660#discussion_r888603267 ## python/pyspark/pandas/groupby.py: ## @@ -805,7 +874,7 @@ def all(self, skipna: bool = True) -> FrameLike: 5 False """ groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] -internal, sdf = self._prepare_reduce(groupkey_names) +internal, _, sdf = self._prepare_reduce(groupkey_names) Review Comment: Hm, I can't follow this change. `_prepare_reduce` seems returning internal frame and dataframe. what's the one in the middle? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
wangyum commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145534550 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum closed pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
wangyum closed pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports URL: https://github.com/apache/spark/pull/36750 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties
HyukjinKwon closed pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties URL: https://github.com/apache/spark/pull/36736 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties
HyukjinKwon commented on PR #36736: URL: https://github.com/apache/spark/pull/36736#issuecomment-1145530299 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon opened a new pull request, #36756: [SPARK-39369][INFRA] Increase the memory for building from 4096 to 5120MB in AppVeyor
HyukjinKwon opened a new pull request, #36756: URL: https://github.com/apache/spark/pull/36756 ### What changes were proposed in this pull request? https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/43740704 AppVeyor build is being failed because of the lack of memory. We should increase it to make the build pass ### Why are the changes needed? To make the build pass. ### Does this PR introduce _any_ user-facing change? No, dev/test-only. ### How was this patch tested? CI in this PR should test it out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a diff in pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
wangyum commented on code in PR #36755: URL: https://github.com/apache/spark/pull/36755#discussion_r888564235 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ## @@ -288,7 +288,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J case s: Subquery if s.correlated => plan case _ if !conf.runtimeFilterSemiJoinReductionEnabled && !conf.runtimeFilterBloomFilterEnabled => plan -case _ => tryInjectRuntimeFilter(plan) +case _ => + val newPlan = tryInjectRuntimeFilter(plan) + if (conf.runtimeFilterSemiJoinReductionEnabled) { Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties
AngersZh commented on PR #36736: URL: https://github.com/apache/spark/pull/36736#issuecomment-1145528214 ping @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
AngersZh commented on code in PR #36754: URL: https://github.com/apache/spark/pull/36754#discussion_r888563938 ## sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java: ## @@ -44,10 +44,14 @@ * 4. In Hive's code, the method [[merge()] pass a serialized histogram, * in Spark, this method pass a deserialized histogram. * Here we change the code about merge bins. + * + * @since 3.3.0 Review Comment: > cc @AngersZh Later LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sigmod commented on a diff in pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
sigmod commented on code in PR #36755: URL: https://github.com/apache/spark/pull/36755#discussion_r888560970 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ## @@ -288,7 +288,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J case s: Subquery if s.correlated => plan case _ if !conf.runtimeFilterSemiJoinReductionEnabled && !conf.runtimeFilterBloomFilterEnabled => plan -case _ => tryInjectRuntimeFilter(plan) +case _ => + val newPlan = tryInjectRuntimeFilter(plan) + if (conf.runtimeFilterSemiJoinReductionEnabled) { Review Comment: how about `if (conf.runtimeFilterSemiJoinReductionEnable && !plan.fastEquals(newPlan))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36683: [SPARK-39301][SQL][PYTHON] Leverage LocalRelation and respect Arrow batch size in createDataFrame with Arrow optimization
HyukjinKwon commented on PR #36683: URL: https://github.com/apache/spark/pull/36683#issuecomment-1145517326 Gentle ping for a review :-). I know it has some trade-off but I believe this addresses more common cases and benefit more users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36752: [SPARK-39259] Evaluate timestamps consistently in subqueries
AmplabJenkins commented on PR #36752: URL: https://github.com/apache/spark/pull/36752#issuecomment-1145517157 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36753: [SPARK-39259] Evaluate timestamps consistently in subqueries
AmplabJenkins commented on PR #36753: URL: https://github.com/apache/spark/pull/36753#issuecomment-1145517135 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`
beliefer commented on code in PR #36714: URL: https://github.com/apache/spark/pull/36714#discussion_r888554738 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ## @@ -359,6 +359,32 @@ case class Percentile( ) } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column `col`.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col); + 5.0 + > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); + 5.0 + """, + group = "agg_funcs", + since = "3.4.0") +// scalastyle:on line.size.limit +case class Median(child: Expression) + extends AggregateFunction +with RuntimeReplaceableAggregate +with ImplicitCastInputTypes +with UnaryLike[Expression] { Review Comment: I got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
wangyum commented on PR #36755: URL: https://github.com/apache/spark/pull/36755#issuecomment-1145503284 cc @sigmod @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum opened a new pull request, #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`
wangyum opened a new pull request, #36755: URL: https://github.com/apache/spark/pull/36755 ### What changes were proposed in this pull request? This PR moves `RewritePredicateSubquery` into `InjectRuntimeFilter`. ### Why are the changes needed? Reduce the number of `RewritePredicateSubquery` runs, since `spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled` is disabled by default. For example: ``` build/sbt "sql/testOnly *TPCDSQuerySuite" ``` Before this PR: ``` ... org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 17978319 / 31026106 26 / 624 ... ``` After this PR: ``` ... org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 16680901 / 18994542 26 / 312 ... ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36697: [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
dongjoon-hyun commented on PR #36697: URL: https://github.com/apache/spark/pull/36697#issuecomment-1145480808 Thank you, @pan3793 , @sunchao , @cloud-fan ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
HyukjinKwon commented on PR #36701: URL: https://github.com/apache/spark/pull/36701#issuecomment-114542 LGTM otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
HyukjinKwon commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888519926 ## python/pyspark/tests/test_shuffle.py: ## @@ -117,6 +169,37 @@ def legit_merge_combiners(x, y): m.mergeCombiners(map(lambda x_y1: (x_y1[0], [x_y1[1]]), data)) +class ExternalGroupByTests(unittest.TestCase): +def setUp(self): +self.N = 1 << 20 +values = [i for i in range(self.N)] +keys = [i for i in range(2)] +import itertools + +self.data = [value for value in itertools.product(keys, values)] +self.agg = Aggregator( +lambda x: [x], lambda x, y: x.append(y) or x, lambda x, y: x.extend(y) or x +) + +def test_medium_dataset(self): +# SPARK-39179: Test external group by for medium dataset +m = ExternalGroupBy(self.agg, 5, partitions=3) +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), 2 * sum(range(self.N))) + +def test_dataset_with_keys_are_unsorted(self): +# SPARK-39179: Test external group when numbers of keys are greater than SORT KEY Limit. +m = ExternalGroupBy(self.agg, 5, partitions=3) +try: +m.SORT_KEY_LIMIT = 1 +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), 2 * sum(range(self.N))) +finally: +m.SORT_KEY_LIMIT = 1000 Review Comment: Let's probably do this way: ```python origin = m.SORT_KEY_LIMIT try: m.SORT_KEY_LIMIT = ... finally: m.SORT_KEY_LIMIT = origin ``` In this way, if somebody changes the default value in the main code, we won't have to fix the test together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py
HyukjinKwon commented on code in PR #36701: URL: https://github.com/apache/spark/pull/36701#discussion_r888519237 ## python/pyspark/tests/test_shuffle.py: ## @@ -54,6 +63,49 @@ def test_medium_dataset(self): self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) * 3) +def test_shuffle_data_with_multiple_locations(self): +# SPARK-39179: Test shuffle of data with multiple location also check +# shuffle locations get randomized + +with tempfile.TemporaryDirectory() as tempdir1, tempfile.TemporaryDirectory() as tempdir2: +os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2 +index_of_tempdir1 = [False, False] +for idx in range(10): +m = ExternalMerger(self.agg, 20) +if m.localdirs[0].startswith(tempdir1): +index_of_tempdir1[0] = True +elif m.localdirs[1].startswith(tempdir1): +index_of_tempdir1[1] = True +m.mergeValues(self.data) +self.assertTrue(m.spills >= 1) +self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N))) +self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == index_of_tempdir1[1])) +del os.environ["SPARK_LOCAL_DIRS"] Review Comment: @pralabhkumar let's probably restore to the original value so other tests won't be affected. For exmaple, after this test, `SPARK_LOCAL_DIRS` will be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon closed pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
HyukjinKwon closed pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module URL: https://github.com/apache/spark/pull/36754 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
HyukjinKwon commented on PR #36754: URL: https://github.com/apache/spark/pull/36754#issuecomment-1145475718 Merged to master and branch-3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
sunchao commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145471603 > Lastly, could you make the PR description up-to-date? For example, the following seems to need some changes. > > > This PR removes the check so that the command works as long as the Hive version used by the HMS... Updated the PR description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] closed pull request #35329: [SPARK-33326][SQL] Update Partition statistic parameters after ANALYZE TABLE ... PARTITION()
github-actions[bot] closed pull request #35329: [SPARK-33326][SQL] Update Partition statistic parameters after ANALYZE TABLE ... PARTITION() URL: https://github.com/apache/spark/pull/35329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888508050 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -355,14 +355,17 @@ private[hive] class HiveClientImpl( } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { -if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { - // SPARK-29260: Enable supported versions once it support altering database location. - if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { -throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) - } -} +val loc = getDatabase(database.name).locationUri +val changeLoc = !database.locationUri.equals(loc) + val hiveDb = toHiveDatabase(database) shim.alterDatabase(client, database.name, hiveDb) + +if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) { + // Some Hive versions don't support changing database location, so we check here to see if + // the location is actually changed, and throw an error if not. + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError() +} Review Comment: If some HMS forks raise exceptions, it will be enough because it will propagate to Spark users. So, this is only for silent HMS. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36693: [SPARK-39349] Add a centralized CheckError method for QA of error path
HyukjinKwon commented on code in PR #36693: URL: https://github.com/apache/spark/pull/36693#discussion_r888508847 ## sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala: ## @@ -54,10 +72,34 @@ class AnalysisException protected[sql] ( messageParameters: Array[String], origin: Origin) = this( - SparkThrowableHelper.getMessage(errorClass, messageParameters), + SparkThrowableHelper.getMessage(errorClass, None, messageParameters), + line = origin.line, + startPosition = origin.startPosition, + errorClass = Some(errorClass), + errorSubClass = None, + messageParameters = messageParameters) + + def this( +errorClass: String, +errorSubClass: String, +messageParameters: Array[String]) = Review Comment: Just dropping a comment for a nit. maybe would have to be four spaces per https://github.com/databricks/scala-style-guide#spacing-and-indentation: ```scala def this( errorClass: String, errorSubClass: String, messageParameters: Array[String]) = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888508050 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -355,14 +355,17 @@ private[hive] class HiveClientImpl( } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { -if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { - // SPARK-29260: Enable supported versions once it support altering database location. - if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { -throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) - } -} +val loc = getDatabase(database.name).locationUri +val changeLoc = !database.locationUri.equals(loc) + val hiveDb = toHiveDatabase(database) shim.alterDatabase(client, database.name, hiveDb) + +if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) { + // Some Hive versions don't support changing database location, so we check here to see if + // the location is actually changed, and throw an error if not. + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError() +} Review Comment: If some HMS forks raises exceptions, it will be enough because it will propagate to Spark users. So, this is only for silent HMS. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888508050 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -355,14 +355,17 @@ private[hive] class HiveClientImpl( } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { -if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { - // SPARK-29260: Enable supported versions once it support altering database location. - if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { -throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) - } -} +val loc = getDatabase(database.name).locationUri +val changeLoc = !database.locationUri.equals(loc) + val hiveDb = toHiveDatabase(database) shim.alterDatabase(client, database.name, hiveDb) + +if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) { + // Some Hive versions don't support changing database location, so we check here to see if + // the location is actually changed, and throw an error if not. + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError() +} Review Comment: If some HMS raises exceptions, it will be enough because it will propagate to Spark users. So, this is only for silent HMS. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] wangyum commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
wangyum commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888507803 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -355,14 +355,17 @@ private[hive] class HiveClientImpl( } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { -if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { - // SPARK-29260: Enable supported versions once it support altering database location. - if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { -throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) - } -} +val loc = getDatabase(database.name).locationUri +val changeLoc = !database.locationUri.equals(loc) + val hiveDb = toHiveDatabase(database) shim.alterDatabase(client, database.name, hiveDb) + +if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) { + // Some Hive versions don't support changing database location, so we check here to see if + // the location is actually changed, and throw an error if not. + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError() +} Review Comment: Just silently ignore it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
viirya commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888506874 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -355,14 +355,17 @@ private[hive] class HiveClientImpl( } override def alterDatabase(database: CatalogDatabase): Unit = withHiveState { -if (!getDatabase(database.name).locationUri.equals(database.locationUri)) { - // SPARK-29260: Enable supported versions once it support altering database location. - if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) { -throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion) - } -} +val loc = getDatabase(database.name).locationUri +val changeLoc = !database.locationUri.equals(loc) + val hiveDb = toHiveDatabase(database) shim.alterDatabase(client, database.name, hiveDb) + +if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) { + // Some Hive versions don't support changing database location, so we check here to see if + // the location is actually changed, and throw an error if not. + throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError() +} Review Comment: So for the hive server which doesn't support it, there is no any exception in above `alterDatabase` but it just silently ignore it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports
viirya commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888506364 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -1628,8 +1628,8 @@ object QueryCompilationErrors extends QueryErrorsBase { new AnalysisException(s"$tableIdentifier should be converted to HadoopFsRelation.") } - def alterDatabaseLocationUnsupportedError(version: String): Throwable = { -new AnalysisException(s"Hive $version does not support altering database location") + def alterDatabaseLocationUnsupportedError(): Throwable = { +new AnalysisException(s"Hive metastore does not support altering database location") Review Comment: nit: ```suggestion new AnalysisException("Hive metastore does not support altering database location") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
dtenedor commented on PR #36745: URL: https://github.com/apache/spark/pull/36745#issuecomment-1145458556 @sadikovi thanks for your review, these are helpful ideas! Please look again when you have time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
dtenedor commented on code in PR #36745: URL: https://github.com/apache/spark/pull/36745#discussion_r888505739 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala: ## @@ -427,6 +428,7 @@ class SessionCatalog( tableDefinition.copy(identifier = tableIdentifier) } + ResolveDefaultColumns.checkDataSourceSupportsDefaultColumns(tableDefinition) Review Comment: This is not supported currently, the feature is SQL-only as of now. References to columns named "default" will simply return "column not found" errors until then. We can certainly consider adding this feature to the DataFrame API and PySpark later, I would be interested in helping with this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types
dtenedor commented on code in PR #36745: URL: https://github.com/apache/spark/pull/36745#discussion_r888505435 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala: ## @@ -231,4 +232,18 @@ object ResolveDefaultColumns { } } } + + def checkDataSourceSupportsDefaultColumns(table: CatalogTable): Unit = { +if (table.schema.fields.map(_.metadata).exists { m => + m.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) || +m.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY) +}) { + table.provider.getOrElse("").toLowerCase() match { +case "csv" | "json" | "parquet" | "orc" => Review Comment: Good ideas! I made this a configuration option. It does support DataSource V2, I added a couple more tests to show that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145456039 Lastly, could you make the PR description up-to-date? For example, the following? > This PR removes the check so that the command works as long as the Hive version used by the HMS... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
sunchao commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888490510 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: Updated the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888477684 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: Sorry for misleading you, @sunchao . I was wondering this case which didn't throw exception. If there is no sign of failures (or no-op), the customer will be surprised at the next session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888482892 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: +1 for throwing exception! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
sunchao commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888482028 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: Good idea! Instead of warning, I'm thinking maybe we should throw exception when the location is not changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888478943 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: Can we do double-check the location inside this method after invoking `shim.alterDatabase`? https://github.com/apache/spark/blob/52e2717c2d1b6e1f449de5714b6e202074bac26f/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L357-L366 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
sunchao commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888477757 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: It's hard for Spark to give warning in this case since it doesn't know the Hive version used by the remote Hive metastore. It's possible in unit tests since both client and the metastore are of the same Hive version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888477684 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: Sorry for misleading you, @sunchao . I was wondering this case which didn't throw exception. If there is no sign of failures (or no-op), the customer will surprise at the next session. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on code in PR #36750: URL: https://github.com/apache/spark/pull/36750#discussion_r888476932 ## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala: ## @@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) assert(client.getDatabase("temporary").properties.contains("flag")) // test alter database location +val oldDatabasePath = database.locationUri.getPath val tempDatabasePath2 = Utils.createTempDir().toURI -// Hive support altering database location since HIVE-8472. +client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) +val uriInCatalog = client.getDatabase("temporary").locationUri +assert("file" === uriInCatalog.getScheme) + if (version == "3.0" || version == "3.1") { - client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - val uriInCatalog = client.getDatabase("temporary").locationUri - assert("file" === uriInCatalog.getScheme) + // Hive support altering database location since HIVE-8472 assert(new Path(tempDatabasePath2.getPath).toUri.getPath === uriInCatalog.getPath, "Failed to alter database location") } else { - val e = intercept[AnalysisException] { -client.alterDatabase(database.copy(locationUri = tempDatabasePath2)) - } - assert(e.getMessage.contains("does not support altering database location")) + // .. otherwise, the command should be non-effective against older versions of Hive + assert(oldDatabasePath === uriInCatalog.getPath, "Expected database location to be unchanged") Review Comment: If there is no exception from HMS side in this case, we had better give a warning from Spark side here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
sunchao commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145425002 The `ALTER DATABASE SET LOCATION` command will change the default location for new tables created afterwards. So in step 2) above, if table location is not explicitly specified, the new tables will be created under the new location defined in step 1), while the old tables remain unchanged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
sunchao commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145407388 Fixed. @viirya pls take another look, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] holdenk commented on pull request #36434: [SPARK-38969][K8S] Fix Decom reporting
holdenk commented on PR #36434: URL: https://github.com/apache/spark/pull/36434#issuecomment-1145371331 Update: with the change for increased resilence it passes integration tests on my machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r888383953 ## sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala: ## @@ -299,15 +313,18 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest { val functionFields = ScalaReflection.getConstructorParameterValues(function) val columnFields = ScalaReflection.getConstructorParameterValues(column) assert(dbFields == Seq("nama", "descripta", "locata")) -assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false)) +assert(Seq(tableFields.apply(0), tableFields.apply(2), tableFields.apply(3), Review Comment: I didn't do this is because the 3rd parameter is the namespace as an Array thus `==` does not do deep compare. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
amaliujia commented on code in PR #36586: URL: https://github.com/apache/spark/pull/36586#discussion_r888374320 ## sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala: ## @@ -64,12 +64,26 @@ class Database( @Stable class Table( val name: String, -@Nullable val database: String, +@Nullable val qualifier: Array[String], @Nullable val description: String, val tableType: String, val isTemporary: Boolean) extends DefinedByConstructorParams { + def database: String = parseQualifier + + def parseQualifier: String = { +if (qualifier == null) { + null +} else if (qualifier.length == 2) { + qualifier(1) +} else if (qualifier.length == 1) { Review Comment: for example if a table is `catalog1.db1.table1`, In this case `catalog_name=catalog1, qualifier=[db1], table_name=table1`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] JoshRosen commented on pull request #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.
JoshRosen commented on PR #36751: URL: https://github.com/apache/spark/pull/36751#issuecomment-1145291954 If I recall, I think the original motivation for this "release all locks at the end of the task" code was to prevent indefinite "pin leaks" if tasks fail to properly release locks (e.g. because an iterator isn't fully consumed or because the task crashes). I see how this is a problem in the case of multi-threaded tasks. As you pointed out in your comment at https://github.com/apache/spark/pull/35991#discussion_r887524913, > The good news is that the code that takes out the write locks is only in the BlockManager and that it properly cleans up after itself (release locks, remove data if needed). This means we could fix this issue by removing the release of write locks from `releaseAllLocksForTask`. It might be a good idea to mention this "write locks are only managed in the BlockManager" property in a code comment near where you're changing the the release all locks code,. --- In principle, aren't we also vulnerable to race conditions for read locks? Let's say that I've configured Spark to use off-heap memory and I have a task where a secondary task thread is reading from that block. If the main task thread exits and releases the read locks, then the block manager could decide to evict the off-heap block while it's still being read in the secondary thread. I think that would be a use-after-free bug which could trigger a segfault. Perhaps that read lock race condition is rarer and less severe from a correctness point of view: - In this scenario, the reader is a task thread belonging to the finished task. - If the reader segfaults then the JVM will die and we'll reschedule the tasks. This isn't ideal, but compared to the writer scenario - If the reader reads corrupt data because the memory region was re-used in a subsequent allocation then I think we're okay as long as the reading thread isn't performing external side effects based on the possibly-corrupt data (e.g. posting data to an external REST API). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
gengliangwang commented on code in PR #36754: URL: https://github.com/apache/spark/pull/36754#discussion_r888368841 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -46,7 +46,7 @@ import org.apache.spark.sql.types._ * As commands are executed eagerly, this also includes errors thrown during the execution of * commands, which users can see immediately. */ -object QueryCompilationErrors extends QueryErrorsBase { +private[sql] object QueryCompilationErrors extends QueryErrorsBase { Review Comment: This was a mistake on 3.2.1. Let's fix it in 3.3.0 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala: ## @@ -42,7 +42,7 @@ import org.apache.spark.sql.types.{DataType, DoubleType, FloatType} * 7. SQL expressions shall be wrapped by double quotes. * For example: "earnings + 1". */ -trait QueryErrorsBase { +private[sql] trait QueryErrorsBase { Review Comment: This was a mistake on 3.2.1. Let's fix it in 3.3.0 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala: ## @@ -66,7 +66,7 @@ import org.apache.spark.util.CircularBuffer * This does not include exceptions thrown during the eager execution of commands, which are * grouped into [[QueryCompilationErrors]]. */ -object QueryExecutionErrors extends QueryErrorsBase { +private[sql] object QueryExecutionErrors extends QueryErrorsBase { Review Comment: This was a mistake on 3.2.1. Let's fix it in 3.3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
gengliangwang commented on code in PR #36754: URL: https://github.com/apache/spark/pull/36754#discussion_r888303216 ## sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java: ## @@ -44,10 +44,14 @@ * 4. In Hive's code, the method [[merge()] pass a serialized histogram, * in Spark, this method pass a deserialized histogram. * Here we change the code about merge bins. + * + * @since 3.3.0 Review Comment: cc @AngersZh -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module
gengliangwang opened a new pull request, #36754: URL: https://github.com/apache/spark/pull/36754 ### What changes were proposed in this pull request? Compare the 3.3.0 API doc with the latest release version 3.2.1. Fix the following issues: * Add missing Since annotation for new APIs * Remove the leaking class/object in API doc ### Why are the changes needed? Improve API docs ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888299878 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +744,156 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + try { +byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, attemptId)); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info", e); + } +} + } + + private void writeAppAttemptShuffleMergeInfo( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey( +new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, shuffleMergeId)); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition", e); + } +} + + } + + private T parseDbKey(String key, String prefix, Class valueType) throws IOException { +if (!key.startsWith(prefix + DB_KEY_DELIMITER)) { + throw new IllegalArgumentException("expected a string starting with " + prefix); +} +String json = key.substring(prefix.length() + 1); +return mapper.readValue(json, valueType); + } + + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key, + String prefix) throws IOException { +return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class); + } + + private byte[] getDbKey(Object key, String prefix) throws IOException { +// we stick a common prefix on all the keys so we can find them in the DB +String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); +return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + @VisibleForTesting + void reloadAppShuffleInfo(DB db) throws IOException { +logger.info("Reload applications merged shuffle information from DB"); +reloadActiveAppAttemptsPathInfo(db); +reloadFinalizedAppAttemptsShuffleMergeInfo(db); + } + + private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { +Map.Entry e = itr.next(); +String key = new String(e.getKey(), StandardCharsets.UTF_8); +if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; +} +AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); +try{ + AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), AppPathsInfo.class); + logger.info("Reloading active application {}_{} merged shuffle files paths", + appAttemptId.appId, appAttemptId.attemptId); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { +if (existingAppShuffleInfo == null || +existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + return new AppShuffleInfo( + appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); +} else { + return existingAppShuffleInfo; +} + }); +} catch (Exception exception) { + logger.error("Parsing exception is {}", exception); +} + } +} + } + + private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_SHUFFLE_FIN
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888299709 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +744,156 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + try { +byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, attemptId)); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info", e); + } +} + } + + private void writeAppAttemptShuffleMergeInfo( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey( +new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, shuffleMergeId)); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition", e); + } +} + + } + + private T parseDbKey(String key, String prefix, Class valueType) throws IOException { +if (!key.startsWith(prefix + DB_KEY_DELIMITER)) { + throw new IllegalArgumentException("expected a string starting with " + prefix); +} +String json = key.substring(prefix.length() + 1); +return mapper.readValue(json, valueType); + } + + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key, + String prefix) throws IOException { +return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class); + } + + private byte[] getDbKey(Object key, String prefix) throws IOException { +// we stick a common prefix on all the keys so we can find them in the DB +String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); +return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + @VisibleForTesting + void reloadAppShuffleInfo(DB db) throws IOException { +logger.info("Reload applications merged shuffle information from DB"); +reloadActiveAppAttemptsPathInfo(db); +reloadFinalizedAppAttemptsShuffleMergeInfo(db); + } + + private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { +Map.Entry e = itr.next(); +String key = new String(e.getKey(), StandardCharsets.UTF_8); +if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; +} +AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); +try{ + AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), AppPathsInfo.class); + logger.info("Reloading active application {}_{} merged shuffle files paths", + appAttemptId.appId, appAttemptId.attemptId); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { +if (existingAppShuffleInfo == null || +existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + return new AppShuffleInfo( + appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); +} else { + return existingAppShuffleInfo; +} + }); +} catch (Exception exception) { + logger.error("Parsing exception is {}", exception); +} + } +} + } + + private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_SHUFFLE_FIN
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888299473 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +744,156 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + try { +byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, attemptId)); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info", e); + } +} + } + + private void writeAppAttemptShuffleMergeInfo( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey( +new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, shuffleMergeId)); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition", e); + } +} + + } + + private T parseDbKey(String key, String prefix, Class valueType) throws IOException { +if (!key.startsWith(prefix + DB_KEY_DELIMITER)) { + throw new IllegalArgumentException("expected a string starting with " + prefix); +} +String json = key.substring(prefix.length() + 1); +return mapper.readValue(json, valueType); + } + + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key, + String prefix) throws IOException { +return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class); + } + + private byte[] getDbKey(Object key, String prefix) throws IOException { +// we stick a common prefix on all the keys so we can find them in the DB +String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); +return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + @VisibleForTesting + void reloadAppShuffleInfo(DB db) throws IOException { +logger.info("Reload applications merged shuffle information from DB"); +reloadActiveAppAttemptsPathInfo(db); +reloadFinalizedAppAttemptsShuffleMergeInfo(db); + } + + private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException { +if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { +Map.Entry e = itr.next(); +String key = new String(e.getKey(), StandardCharsets.UTF_8); +if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; +} +AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); +try{ + AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), AppPathsInfo.class); + logger.info("Reloading active application {}_{} merged shuffle files paths", + appAttemptId.appId, appAttemptId.attemptId); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { +if (existingAppShuffleInfo == null || +existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + return new AppShuffleInfo( + appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); +} else { + return existingAppShuffleInfo; Review Comment: Added. ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +744,156 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); +
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888299188 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() { } } + /** + * Simply encodes an application attempt ID. + */ + public static class AppAttemptId { Review Comment: Removed all the equals and hashcode for these Json beans classes. Is it required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888298796 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -576,6 +661,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } + cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId); Review Comment: Removed the cleanup though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888298391 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -576,6 +661,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } + cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId); Review Comment: Test to be added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888298248 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -209,9 +246,16 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); + // Make sure unuseful non-finalized merged data/index/meta files get cleaned up + // during service restart + if (dataFile.exists()) dataFile.delete(); + if (indexFile.exists()) indexFile.delete(); + if (metaFile.exists()) metaFile.delete(); Review Comment: Removed and added comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on PR #35906: URL: https://github.com/apache/spark/pull/35906#issuecomment-1145216328 > Added a flag in closeAndDeletePartitionFilesIfNeeded to check whether DB cleanup is needed or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`
MaxGekk commented on code in PR #36714: URL: https://github.com/apache/spark/pull/36714#discussion_r888289663 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ## @@ -359,6 +359,32 @@ case class Percentile( ) } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column `col`.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col); + 5.0 + > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); + 5.0 + """, + group = "agg_funcs", + since = "3.4.0") +// scalastyle:on line.size.limit +case class Median(child: Expression) + extends AggregateFunction +with RuntimeReplaceableAggregate +with ImplicitCastInputTypes +with UnaryLike[Expression] { Review Comment: Should be aligned to `extends`, see https://github.com/databricks/scala-style-guide#spacing-and-indentation ```suggestion with RuntimeReplaceableAggregate with ImplicitCastInputTypes with UnaryLike[Expression] { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`
MaxGekk commented on code in PR #36714: URL: https://github.com/apache/spark/pull/36714#discussion_r888289663 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ## @@ -359,6 +359,32 @@ case class Percentile( ) } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column `col`.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col); + 5.0 + > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); + 5.0 + """, + group = "agg_funcs", + since = "3.4.0") +// scalastyle:on line.size.limit +case class Median(child: Expression) + extends AggregateFunction +with RuntimeReplaceableAggregate +with ImplicitCastInputTypes +with UnaryLike[Expression] { Review Comment: Should be aligned to `extends`, see https://github.com/databricks/scala-style-guide#spacing-and-indentation ```suggestion with RuntimeReplaceableAggregate with ImplicitCastInputTypes with UnaryLike[Expression] { ``` ```suggestion with RuntimeReplaceableAggregate with ImplicitCastInputTypes with UnaryLike[Expression] { ``` ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ## @@ -359,6 +359,32 @@ case class Percentile( ) } +// scalastyle:off line.size.limit Review Comment: Is this really needed? ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala: ## @@ -359,6 +359,32 @@ case class Percentile( ) } +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column `col`.", + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col); + 5.0 + > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' MONTH) AS tab(col); + 5.0 + """, + group = "agg_funcs", + since = "3.4.0") +// scalastyle:on line.size.limit +case class Median(child: Expression) + extends AggregateFunction +with RuntimeReplaceableAggregate +with ImplicitCastInputTypes +with UnaryLike[Expression] { Review Comment: Should be aligned to `extends`, see https://github.com/databricks/scala-style-guide#spacing-and-indentation ```suggestion with RuntimeReplaceableAggregate with ImplicitCastInputTypes with UnaryLike[Expression] { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] olaky opened a new pull request, #36753: [SPARK-39259] Evaluate timestamps consistently in subqueries
olaky opened a new pull request, #36753: URL: https://github.com/apache/spark/pull/36753 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] olaky opened a new pull request, #36752: [SPARK-39259] Evaluate timestamps consistently in subqueries
olaky opened a new pull request, #36752: URL: https://github.com/apache/spark/pull/36752 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888286186 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -536,9 +619,11 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } } // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results - // sent to the driver will be empty. This cam happen when the service didn't receive any + // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. + writeAppAttemptShuffleMergeInfo( Review Comment: Moved the write prior to the final return in this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
zhouyejoe commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888284976 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +cleanUpAppShuffleInfoInDB(appShuffleInfo); Review Comment: Yes, applicationRemoved will trigger the DB deletion of AppAttemptPathsInfo directly. But the merged shuffle metadata will still be deleted asynchronously with the cleanupExecutor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on pull request #36672: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values
dtenedor commented on PR #36672: URL: https://github.com/apache/spark/pull/36672#issuecomment-1145200667 @HyukjinKwon the CI passes now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve documentation of pandas API supported list
MaxGekk closed pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve documentation of pandas API supported list URL: https://github.com/apache/spark/pull/36749 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve documentation of pandas API supported list
MaxGekk commented on PR #36749: URL: https://github.com/apache/spark/pull/36749#issuecomment-1145199197 +1, LGTM. Merging to 3.3. Thank you, @beobest2 and @HyukjinKwon @Yikun for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries
MaxGekk commented on PR #36654: URL: https://github.com/apache/spark/pull/36654#issuecomment-1145197543 @olaky Could you open a separate PRs with backports to branch-3.3 and branch-3.2 (according to SPARK-39259, 3.2 has this issue). Congratulations with the first contribution to Apache Spark, and welcome to Spark community! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries
MaxGekk closed pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries URL: https://github.com/apache/spark/pull/36654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries
MaxGekk commented on PR #36654: URL: https://github.com/apache/spark/pull/36654#issuecomment-1145192594 +1, LGTM. Merging to master, 3.3, 3.2. Thank you, @olaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries
MaxGekk commented on PR #36654: URL: https://github.com/apache/spark/pull/36654#issuecomment-1145192596 +1, LGTM. Merging to master, 3.3, 3.2. Thank you, @olaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] akpatnam25 commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se
akpatnam25 commented on code in PR #36734: URL: https://github.com/apache/spark/pull/36734#discussion_r888273505 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4402,12 +4501,20 @@ object DAGSchedulerSuite { def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: Long = -1): MapStatus = MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), mapTaskId) - def makeBlockManagerId(host: String): BlockManagerId = { -BlockManagerId(host + "-exec", host, 12345) + def makeBlockManagerId(host: String, isShufflePushMerger: Boolean = false): BlockManagerId = { Review Comment: agreed, changed it to accept an optional argument -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] viirya commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
viirya commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145188518 `AlterNamespaceSetLocationSuite` seems failed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] attilapiros commented on a diff in pull request #36512: [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
attilapiros commented on code in PR #36512: URL: https://github.com/apache/spark/pull/36512#discussion_r888265229 ## core/src/main/scala/org/apache/spark/storage/BlockManager.scala: ## @@ -933,46 +935,56 @@ private[spark] class BlockManager( }) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) } else if (level.useDisk && diskStore.contains(blockId)) { + var diskData: BlockData = null try { -val diskData = diskStore.getBytes(blockId) -val iterToReturn: Iterator[Any] = { - if (level.deserialized) { -val diskValues = serializerManager.dataDeserializeStream( - blockId, - diskData.toInputStream())(info.classTag) -maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) - } else { -val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) - .map { _.toInputStream(dispose = false) } - .getOrElse { diskData.toInputStream() } -serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) - } +diskData = diskStore.getBytes(blockId) +val iterToReturn = if (level.deserialized) { + val diskValues = serializerManager.dataDeserializeStream( +blockId, +diskData.toInputStream())(info.classTag) + maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) +} else { + val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData) +.map { _.toInputStream(dispose = false) } +.getOrElse { diskData.toInputStream() } + serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, { releaseLockAndDispose(blockId, diskData, taskContext) }) Some(new BlockResult(ci, DataReadMethod.Disk, info.size)) } catch { -case ex: KryoException if ex.getCause.isInstanceOf[IOException] => - // We need to have detailed log message to catch environmental problems easily. - // Further details: https://issues.apache.org/jira/browse/SPARK-37710 - processKryoException(ex, blockId) - throw ex +case t: Throwable => + if (diskData != null) { +diskData.dispose() +diskData = null + } + releaseLock(blockId, taskContext) + if (isIORelatedException(t)) { +logInfo(extendMessageWithBlockDetails(t.getMessage, blockId)) +// Remove the block so that its unavailability is reported to the driver +removeBlock(blockId) Review Comment: I have updated the description and the title. @Ngone51 is there something else? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
otterc commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r888251830 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +cleanUpAppShuffleInfoInDB(appShuffleInfo); Review Comment: Could you please clarify what's the proposal? > Do we want to delete the app attempt paths immediately, and do the shuffle deletes async (along with path deletes like here) ? Are we saying to delete app attempt paths metadata that is save in db should be cleaned up immediately and the merged shuffle metadata that is saved in db would be deleted asynchronously? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] otterc commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to
otterc commented on code in PR #36734: URL: https://github.com/apache/spark/pull/36734#discussion_r888231614 ## core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala: ## @@ -4402,12 +4501,20 @@ object DAGSchedulerSuite { def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: Long = -1): MapStatus = MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), mapTaskId) - def makeBlockManagerId(host: String): BlockManagerId = { -BlockManagerId(host + "-exec", host, 12345) + def makeBlockManagerId(host: String, isShufflePushMerger: Boolean = false): BlockManagerId = { Review Comment: how about adding execId as optional argument? Will prevent future changes to this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell commented on pull request #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.
hvanhovell commented on PR #36751: URL: https://github.com/apache/spark/pull/36751#issuecomment-1145135863 This is still a WIP. If we think this is the right thing to do, then I will add some tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] hvanhovell opened a new pull request, #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.
hvanhovell opened a new pull request, #36751: URL: https://github.com/apache/spark/pull/36751 ### What changes were proposed in this pull request? This PR removes the unlocking of write locks on task end from the `BlockInfoManager`. ### Why are the changes needed? The `BlockInfoManager` releases all locks held by a task when the task is done. It also release write locks, the problem with that is that a thread (other than the main task thread) might still be modifying the block. By releasing it the block now seems readable, and a reader might observe a block in a partial or non-existent state. This is a follow-up for https://github.com/apache/spark/pull/35991. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests should pass. I can add a tests if folks are onboard with this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dongjoon-hyun commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1
dongjoon-hyun commented on PR #36750: URL: https://github.com/apache/spark/pull/36750#issuecomment-1145110352 Thank you for pinging me, @sunchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org