[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


pralabhkumar commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888632502


##
python/pyspark/tests/test_shuffle.py:
##
@@ -54,6 +63,49 @@ def test_medium_dataset(self):
 self.assertTrue(m.spills >= 1)
 self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) 
* 3)
 
+def test_shuffle_data_with_multiple_locations(self):
+# SPARK-39179: Test shuffle of data with multiple location also check
+# shuffle locations get randomized
+
+with tempfile.TemporaryDirectory() as tempdir1, 
tempfile.TemporaryDirectory() as tempdir2:
+os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+index_of_tempdir1 = [False, False]
+for idx in range(10):
+m = ExternalMerger(self.agg, 20)
+if m.localdirs[0].startswith(tempdir1):
+index_of_tempdir1[0] = True
+elif m.localdirs[1].startswith(tempdir1):
+index_of_tempdir1[1] = True
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 
sum(range(self.N)))
+self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == 
index_of_tempdir1[1]))
+del os.environ["SPARK_LOCAL_DIRS"]

Review Comment:
   @HyukjinKwon , sorry didn't understand , your suggestion completely . Please 
help . 
   Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no 
default value)  . So as part of this test it was set , once test case completed 
, it was removed at the end of the test case, so that rest of the cases work as 
is .  Let me know , if i am missing anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set t

2022-06-02 Thread GitBox


mridulm commented on code in PR #36734:
URL: https://github.com/apache/spark/pull/36734#discussion_r888654676


##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if there 
is a FetchFailed event
+// and is not a  MetaDataFetchException which is signified by 
bmAddress being null
+if (bmAddress != null
+  && 
bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {

Review Comment:
   nit: Move the `&&` to the previous line.



##
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
   mapOutputTracker.
 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
 }
+  } else {
+// Unregister the merge result of  if there 
is a FetchFailed event
+// and is not a  MetaDataFetchException which is signified by 
bmAddress being null
+if (bmAddress != null
+  && 
bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {
+  assert(pushBasedShuffleEnabled, "Pushed based shuffle needs to " 
+
+"be enabled so that merge results are present.")

Review Comment:
   `Pushed based shuffle` -> `Push based shuffle`
   
   Change message to something like `Push based shuffle expected to be enabled 
when handling merge block fetch failure` ? To make it clear what we are 
expecting the condition to be for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate Function: regr_intercept

2022-06-02 Thread GitBox


MaxGekk closed pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate 
Function: regr_intercept
URL: https://github.com/apache/spark/pull/36708


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36708: [SPARK-37623][SQL] Support ANSI Aggregate Function: regr_intercept

2022-06-02 Thread GitBox


MaxGekk commented on PR #36708:
URL: https://github.com/apache/spark/pull/36708#issuecomment-1145630828

   +1, LGTM. Merging to master.
   Thank you, @beliefer and @cloud-fan for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk closed pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps 
consistently in subqueries
URL: https://github.com/apache/spark/pull/36752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36752: [SPARK-39259][SQL][3.3] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk commented on PR #36752:
URL: https://github.com/apache/spark/pull/36752#issuecomment-1145620433

   +1, LGTM. Merging to 3.3.
   Thank you, @olaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] xuanyuanking opened a new pull request, #36757: [SPARK-39371][DOCS][Core] Review and fix issues in Scala/Java API docs of Core module #36754

2022-06-02 Thread GitBox


xuanyuanking opened a new pull request, #36757:
URL: https://github.com/apache/spark/pull/36757

   
   
   ### What changes were proposed in this pull request?
   
   Compare the 3.3.0 API doc with the latest release version 3.2.1. Fix the 
following issues:
   
   * Add missing Since annotation for new APIs
   * Remove the leaking class/object in API doc
   
   ### Why are the changes needed?
   
   Improve API docs
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


pralabhkumar commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888632502


##
python/pyspark/tests/test_shuffle.py:
##
@@ -54,6 +63,49 @@ def test_medium_dataset(self):
 self.assertTrue(m.spills >= 1)
 self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) 
* 3)
 
+def test_shuffle_data_with_multiple_locations(self):
+# SPARK-39179: Test shuffle of data with multiple location also check
+# shuffle locations get randomized
+
+with tempfile.TemporaryDirectory() as tempdir1, 
tempfile.TemporaryDirectory() as tempdir2:
+os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+index_of_tempdir1 = [False, False]
+for idx in range(10):
+m = ExternalMerger(self.agg, 20)
+if m.localdirs[0].startswith(tempdir1):
+index_of_tempdir1[0] = True
+elif m.localdirs[1].startswith(tempdir1):
+index_of_tempdir1[1] = True
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 
sum(range(self.N)))
+self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == 
index_of_tempdir1[1]))
+del os.environ["SPARK_LOCAL_DIRS"]

Review Comment:
   @HyukjinKwon , sorry didn't understand , your suggestion completely . Please 
help . 
   Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no 
default value)  . So as part of this test it was set , once test case completed 
, it was removed at the end of the test case, so that rest of the cases work as 
is .  Let me know , if i am missing anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


pralabhkumar commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888632502


##
python/pyspark/tests/test_shuffle.py:
##
@@ -54,6 +63,49 @@ def test_medium_dataset(self):
 self.assertTrue(m.spills >= 1)
 self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) 
* 3)
 
+def test_shuffle_data_with_multiple_locations(self):
+# SPARK-39179: Test shuffle of data with multiple location also check
+# shuffle locations get randomized
+
+with tempfile.TemporaryDirectory() as tempdir1, 
tempfile.TemporaryDirectory() as tempdir2:
+os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+index_of_tempdir1 = [False, False]
+for idx in range(10):
+m = ExternalMerger(self.agg, 20)
+if m.localdirs[0].startswith(tempdir1):
+index_of_tempdir1[0] = True
+elif m.localdirs[1].startswith(tempdir1):
+index_of_tempdir1[1] = True
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 
sum(range(self.N)))
+self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == 
index_of_tempdir1[1]))
+del os.environ["SPARK_LOCAL_DIRS"]

Review Comment:
   @HyukjinKwon , sorry didn't understand it completely . Please help . 
   Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no 
default value)  . So as part of this test it was set , once test case completed 
, it was removed at the end of the test case, so that rest of the cases work as 
is .  Let me know , if i am missing anything.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] pralabhkumar commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


pralabhkumar commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888632502


##
python/pyspark/tests/test_shuffle.py:
##
@@ -54,6 +63,49 @@ def test_medium_dataset(self):
 self.assertTrue(m.spills >= 1)
 self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) 
* 3)
 
+def test_shuffle_data_with_multiple_locations(self):
+# SPARK-39179: Test shuffle of data with multiple location also check
+# shuffle locations get randomized
+
+with tempfile.TemporaryDirectory() as tempdir1, 
tempfile.TemporaryDirectory() as tempdir2:
+os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+index_of_tempdir1 = [False, False]
+for idx in range(10):
+m = ExternalMerger(self.agg, 20)
+if m.localdirs[0].startswith(tempdir1):
+index_of_tempdir1[0] = True
+elif m.localdirs[1].startswith(tempdir1):
+index_of_tempdir1[1] = True
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 
sum(range(self.N)))
+self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == 
index_of_tempdir1[1]))
+del os.environ["SPARK_LOCAL_DIRS"]

Review Comment:
   @HyukjinKwon , sorry didn't understand it completely . Please help . 
   Originally SPARK_LOCAL_DIRS is not set in the environment variable(it has no 
default value)  . So as part of this test it was set , once test case completed 
, it was removed at the end of the test case, so that rest of the cases work as 
is . 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #36726: [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source

2022-06-02 Thread GitBox


sadikovi commented on code in PR #36726:
URL: https://github.com/apache/spark/pull/36726#discussion_r887638039


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##
@@ -150,6 +150,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
   case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
   case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
   case TimestampType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))
+  // Most of the databases either don't support TIMESTAMP WITHOUT TIME 
ZONE or map it to
+  // TIMESTAMP type. This will be overwritten in dialects.
+  case TimestampNTZType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))

Review Comment:
   This is a common use case of treating TIMESTAMP as timestamp without time 
zone. JDBC dialects can override this setting if need be. For example, SQL 
Server uses DATETIME instead. I have verified that most of the jdbc data 
sources work fine with TIMESTAMP. 
   
   I am going to update the comment to elaborate in more detail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`

2022-06-02 Thread GitBox


MaxGekk closed pull request #36714: [SPARK-39320][SQL] Support aggregate 
function `MEDIAN`
URL: https://github.com/apache/spark/pull/36714


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #36726: [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source

2022-06-02 Thread GitBox


sadikovi commented on code in PR #36726:
URL: https://github.com/apache/spark/pull/36726#discussion_r887638039


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala:
##
@@ -150,6 +150,9 @@ object JdbcUtils extends Logging with SQLConfHelper {
   case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
   case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
   case TimestampType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))
+  // Most of the databases either don't support TIMESTAMP WITHOUT TIME 
ZONE or map it to
+  // TIMESTAMP type. This will be overwritten in dialects.
+  case TimestampNTZType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))

Review Comment:
   This is a common use case of treating TIMESTAMP as timestamp without time 
zone. JDBC dialects can override this setting if need be. For example, SQL 
Server uses DATETIME instead. I have verified that most of the jdbc data 
sources work fine with TIMESTAMP. 
   
   I left a comment above to explain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #36740: [SPARK-39355][SQL] Avoid UnresolvedAttribute.apply throwing ParseException

2022-06-02 Thread GitBox


AmplabJenkins commented on PR #36740:
URL: https://github.com/apache/spark/pull/36740#issuecomment-1145590291

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #36741: [SPARK-39357][SQL] Fix pmCache memory leak caused by IsolatedClassLoader

2022-06-02 Thread GitBox


AmplabJenkins commented on PR #36741:
URL: https://github.com/apache/spark/pull/36741#issuecomment-1145590268

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


AmplabJenkins commented on PR #36745:
URL: https://github.com/apache/spark/pull/36745#issuecomment-1145590247

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


sadikovi commented on code in PR #36745:
URL: https://github.com/apache/spark/pull/36745#discussion_r888606358


##
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala:
##
@@ -41,21 +41,28 @@ import org.apache.spark.sql.types.{MetadataBuilder, 
StructField, StructType}
  *
  * We can remove this rule once we implement all the catalog functionality in 
`V2SessionCatalog`.
  */
-class ResolveSessionCatalog(val catalogManager: CatalogManager)
+class ResolveSessionCatalog(val analyzer: Analyzer)
   extends Rule[LogicalPlan] with LookupCatalog {
+  val catalogManager = analyzer.catalogManager

Review Comment:
   It is up to you to change if you like, but maybe we could move this line 
below imports



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##
@@ -83,16 +83,26 @@ object ResolveDefaultColumns {
*
* @param analyzer  used for analyzing the result of parsing the 
expression stored as text.
* @param tableSchema   represents the names and types of the columns of the 
statement to process.
+   * @param tableProvider provider of the target table to store default values 
for, if any.
* @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
* @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
*/
   def constantFoldCurrentDefaultsToExistDefaults(
   analyzer: Analyzer,
   tableSchema: StructType,
+  tableProvider: Option[String],
   statementType: String): StructType = {
 if (SQLConf.get.enableDefaultColumns) {
   val newFields: Seq[StructField] = tableSchema.fields.map { field =>
 if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+  // Make sure that the target table has a provider that supports 
default column values.
+  val allowedProviders: Array[String] =
+SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)

Review Comment:
   Maybe we could try to move the code out of the `map` loop? It is not a big 
deal but it could matter for large schemas. For example, we can prepare 
allowedProviders list and then call `map`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36660: [SPARK-39284][PS] Implement Groupby.mad

2022-06-02 Thread GitBox


HyukjinKwon commented on code in PR #36660:
URL: https://github.com/apache/spark/pull/36660#discussion_r888603854


##
python/pyspark/pandas/groupby.py:
##
@@ -759,6 +759,99 @@ def skew(scol: Column) -> Column:
 bool_to_numeric=True,
 )
 
+# TODO: 'axis', 'skipna', 'level' parameter should be implemented.
+def mad(self) -> FrameLike:
+"""
+Compute mean absolute deviation of groups, excluding missing values.
+
+.. versionadded:: 3.4.0
+
+Examples
+
+>>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, 
True],
+..."C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]})
+
+>>> df.groupby("A").mad()
+  B C
+A
+1  0.44  0.44
+2  0.00  0.00
+
+>>> df.B.groupby(df.A).mad()
+A
+10.44
+20.00
+Name: B, dtype: float64
+
+See Also
+
+pyspark.pandas.Series.groupby
+pyspark.pandas.DataFrame.groupby
+"""
+
+groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in 
range(len(self._groupkeys))]
+groupkey_scols = [s.alias(name) for s, name in 
zip(self._groupkeys_scols, groupkey_names)]
+
+agg_columns = []
+for psser in self._agg_columns:
+if isinstance(psser.spark.data_type, BooleanType):
+agg_columns.append(psser.astype(int))
+elif isinstance(psser.spark.data_type, NumericType):
+agg_columns.append(psser)
+
+sdf = self._psdf._internal.spark_frame.select(
+*groupkey_scols, *[psser.spark.column for psser in agg_columns]
+)
+
+internal = InternalFrame(
+spark_frame=sdf,
+index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
+index_names=[psser._column_label for psser in self._groupkeys],
+index_fields=[
+psser._internal.data_fields[0].copy(name=name)
+for psser, name in zip(self._groupkeys, groupkey_names)
+],
+data_spark_columns=[
+scol_for(sdf, psser._internal.data_spark_column_names[0]) for 
psser in agg_columns
+],
+column_labels=[psser._column_label for psser in agg_columns],
+data_fields=[psser._internal.data_fields[0] for psser in 
agg_columns],
+column_label_names=self._psdf._internal.column_label_names,
+)
+psdf: DataFrame = DataFrame(internal)
+
+if len(psdf._internal.column_labels) > 0:

Review Comment:
   Maybe lets add some comments here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sadikovi commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


sadikovi commented on code in PR #36745:
URL: https://github.com/apache/spark/pull/36745#discussion_r888603145


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala:
##
@@ -122,7 +122,7 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
   }
 
   test("create table with default columns") {
-withBasicCatalog { catalog =>
+if (!isHiveExternalCatalog) withBasicCatalog { catalog =>

Review Comment:
   What happens if we remove `if (!isHiveExternalCatalog)`? What is the test 
failure in this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36660: [SPARK-39284][PS] Implement Groupby.mad

2022-06-02 Thread GitBox


HyukjinKwon commented on code in PR #36660:
URL: https://github.com/apache/spark/pull/36660#discussion_r888603267


##
python/pyspark/pandas/groupby.py:
##
@@ -805,7 +874,7 @@ def all(self, skipna: bool = True) -> FrameLike:
 5  False
 """
 groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in 
range(len(self._groupkeys))]
-internal, sdf = self._prepare_reduce(groupkey_names)
+internal, _, sdf = self._prepare_reduce(groupkey_names)

Review Comment:
   Hm, I can't follow this change. `_prepare_reduce` seems returning internal 
frame and dataframe. what's the one in the middle?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


wangyum commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145534550

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum closed pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


wangyum closed pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE 
SET LOCATION` if HMS supports
URL: https://github.com/apache/spark/pull/36750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties

2022-06-02 Thread GitBox


HyukjinKwon closed pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE 
should redact properties
URL: https://github.com/apache/spark/pull/36736


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties

2022-06-02 Thread GitBox


HyukjinKwon commented on PR #36736:
URL: https://github.com/apache/spark/pull/36736#issuecomment-1145530299

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon opened a new pull request, #36756: [SPARK-39369][INFRA] Increase the memory for building from 4096 to 5120MB in AppVeyor

2022-06-02 Thread GitBox


HyukjinKwon opened a new pull request, #36756:
URL: https://github.com/apache/spark/pull/36756

   ### What changes were proposed in this pull request?
   
   
https://ci.appveyor.com/project/ApacheSoftwareFoundation/spark/builds/43740704
   
   AppVeyor build is being failed because of the lack of memory. We should 
increase it to make the build pass
   
   ### Why are the changes needed?
   
   To make the build pass.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, dev/test-only.
   
   ### How was this patch tested?
   
   CI in this PR should test it out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a diff in pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`

2022-06-02 Thread GitBox


wangyum commented on code in PR #36755:
URL: https://github.com/apache/spark/pull/36755#discussion_r888564235


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##
@@ -288,7 +288,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 case s: Subquery if s.correlated => plan
 case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
   !conf.runtimeFilterBloomFilterEnabled => plan
-case _ => tryInjectRuntimeFilter(plan)
+case _ =>
+  val newPlan = tryInjectRuntimeFilter(plan)
+  if (conf.runtimeFilterSemiJoinReductionEnabled) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on pull request #36736: [SPARK-39351][SQL] SHOW CREATE TABLE should redact properties

2022-06-02 Thread GitBox


AngersZh commented on PR #36736:
URL: https://github.com/apache/spark/pull/36736#issuecomment-1145528214

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


AngersZh commented on code in PR #36754:
URL: https://github.com/apache/spark/pull/36754#discussion_r888563938


##
sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java:
##
@@ -44,10 +44,14 @@
  *   4. In Hive's code, the method [[merge()] pass a serialized histogram,
  *  in Spark, this method pass a deserialized histogram.
  *  Here we change the code about merge bins.
+ *
+ * @since 3.3.0

Review Comment:
   > cc @AngersZh
   
   Later LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sigmod commented on a diff in pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`

2022-06-02 Thread GitBox


sigmod commented on code in PR #36755:
URL: https://github.com/apache/spark/pull/36755#discussion_r888560970


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##
@@ -288,7 +288,13 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 case s: Subquery if s.correlated => plan
 case _ if !conf.runtimeFilterSemiJoinReductionEnabled &&
   !conf.runtimeFilterBloomFilterEnabled => plan
-case _ => tryInjectRuntimeFilter(plan)
+case _ =>
+  val newPlan = tryInjectRuntimeFilter(plan)
+  if (conf.runtimeFilterSemiJoinReductionEnabled) {

Review Comment:
   how about 
   
   `if (conf.runtimeFilterSemiJoinReductionEnable && !plan.fastEquals(newPlan))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36683: [SPARK-39301][SQL][PYTHON] Leverage LocalRelation and respect Arrow batch size in createDataFrame with Arrow optimization

2022-06-02 Thread GitBox


HyukjinKwon commented on PR #36683:
URL: https://github.com/apache/spark/pull/36683#issuecomment-1145517326

   Gentle ping for a review :-). I know it has some trade-off but I believe 
this addresses more common cases and benefit more users.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #36752: [SPARK-39259] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


AmplabJenkins commented on PR #36752:
URL: https://github.com/apache/spark/pull/36752#issuecomment-1145517157

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] AmplabJenkins commented on pull request #36753: [SPARK-39259] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


AmplabJenkins commented on PR #36753:
URL: https://github.com/apache/spark/pull/36753#issuecomment-1145517135

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] beliefer commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`

2022-06-02 Thread GitBox


beliefer commented on code in PR #36714:
URL: https://github.com/apache/spark/pull/36714#discussion_r888554738


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##
@@ -359,6 +359,32 @@ case class Percentile(
   )
 }
 
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column 
`col`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col);
+   5.0
+  > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH) AS tab(col);
+   5.0
+  """,
+  group = "agg_funcs",
+  since = "3.4.0")
+// scalastyle:on line.size.limit
+case class Median(child: Expression)
+  extends AggregateFunction
+with RuntimeReplaceableAggregate
+with ImplicitCastInputTypes
+with UnaryLike[Expression] {

Review Comment:
   I got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on pull request #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`

2022-06-02 Thread GitBox


wangyum commented on PR #36755:
URL: https://github.com/apache/spark/pull/36755#issuecomment-1145503284

   cc @sigmod @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum opened a new pull request, #36755: [SPARK-39368][SQL] Move `RewritePredicateSubquery` into `InjectRuntimeFilter`

2022-06-02 Thread GitBox


wangyum opened a new pull request, #36755:
URL: https://github.com/apache/spark/pull/36755

   ### What changes were proposed in this pull request?
   
   This PR moves `RewritePredicateSubquery` into `InjectRuntimeFilter`.
   
   ### Why are the changes needed?
   
   Reduce the number of `RewritePredicateSubquery` runs, since 
`spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled` is disabled by 
default. For example:
   ```
   build/sbt "sql/testOnly *TPCDSQuerySuite"
   ```
   Before this PR:
   ```
   ...
   org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 
 17978319 / 31026106 26 / 624
   ...
   ```
   After this PR:
   ```
   ...
   org.apache.spark.sql.catalyst.optimizer.RewritePredicateSubquery 
 16680901 / 18994542 26 / 312
   ...
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36697: [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated

2022-06-02 Thread GitBox


dongjoon-hyun commented on PR #36697:
URL: https://github.com/apache/spark/pull/36697#issuecomment-1145480808

   Thank you, @pan3793 , @sunchao , @cloud-fan !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


HyukjinKwon commented on PR #36701:
URL: https://github.com/apache/spark/pull/36701#issuecomment-114542

   LGTM otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


HyukjinKwon commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888519926


##
python/pyspark/tests/test_shuffle.py:
##
@@ -117,6 +169,37 @@ def legit_merge_combiners(x, y):
 m.mergeCombiners(map(lambda x_y1: (x_y1[0], [x_y1[1]]), data))
 
 
+class ExternalGroupByTests(unittest.TestCase):
+def setUp(self):
+self.N = 1 << 20
+values = [i for i in range(self.N)]
+keys = [i for i in range(2)]
+import itertools
+
+self.data = [value for value in itertools.product(keys, values)]
+self.agg = Aggregator(
+lambda x: [x], lambda x, y: x.append(y) or x, lambda x, y: 
x.extend(y) or x
+)
+
+def test_medium_dataset(self):
+# SPARK-39179: Test external group by for medium dataset
+m = ExternalGroupBy(self.agg, 5, partitions=3)
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 2 * 
sum(range(self.N)))
+
+def test_dataset_with_keys_are_unsorted(self):
+# SPARK-39179: Test external group when numbers of keys are greater 
than SORT KEY Limit.
+m = ExternalGroupBy(self.agg, 5, partitions=3)
+try:
+m.SORT_KEY_LIMIT = 1
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 2 * 
sum(range(self.N)))
+finally:
+m.SORT_KEY_LIMIT = 1000

Review Comment:
   Let's probably do this way:
   
   ```python
   origin = m.SORT_KEY_LIMIT
   try:
   m.SORT_KEY_LIMIT = ...
   finally:
   m.SORT_KEY_LIMIT = origin
   ```
   
   In this way, if somebody changes the default value in the main code, we 
won't have to fix the test together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36701: [SPARK-39179][PYTHON][TESTS] Improve the test coverage for pyspark/shuffle.py

2022-06-02 Thread GitBox


HyukjinKwon commented on code in PR #36701:
URL: https://github.com/apache/spark/pull/36701#discussion_r888519237


##
python/pyspark/tests/test_shuffle.py:
##
@@ -54,6 +63,49 @@ def test_medium_dataset(self):
 self.assertTrue(m.spills >= 1)
 self.assertEqual(sum(sum(v) for k, v in m.items()), sum(range(self.N)) 
* 3)
 
+def test_shuffle_data_with_multiple_locations(self):
+# SPARK-39179: Test shuffle of data with multiple location also check
+# shuffle locations get randomized
+
+with tempfile.TemporaryDirectory() as tempdir1, 
tempfile.TemporaryDirectory() as tempdir2:
+os.environ["SPARK_LOCAL_DIRS"] = tempdir1 + "," + tempdir2
+index_of_tempdir1 = [False, False]
+for idx in range(10):
+m = ExternalMerger(self.agg, 20)
+if m.localdirs[0].startswith(tempdir1):
+index_of_tempdir1[0] = True
+elif m.localdirs[1].startswith(tempdir1):
+index_of_tempdir1[1] = True
+m.mergeValues(self.data)
+self.assertTrue(m.spills >= 1)
+self.assertEqual(sum(sum(v) for k, v in m.items()), 
sum(range(self.N)))
+self.assertTrue(index_of_tempdir1[0] and (index_of_tempdir1[0] == 
index_of_tempdir1[1]))
+del os.environ["SPARK_LOCAL_DIRS"]

Review Comment:
   @pralabhkumar let's probably restore to the original value so other tests 
won't be affected. For exmaple, after this test, `SPARK_LOCAL_DIRS` will be 
removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon closed pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


HyukjinKwon closed pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix 
issues in Scala/Java API docs of SQL module
URL: https://github.com/apache/spark/pull/36754


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


HyukjinKwon commented on PR #36754:
URL: https://github.com/apache/spark/pull/36754#issuecomment-1145475718

   Merged to master and branch-3.3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


sunchao commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145471603

   > Lastly, could you make the PR description up-to-date? For example, the 
following seems to need some changes.
   > 
   > > This PR removes the check so that the command works as long as the Hive 
version used by the HMS...
   
   Updated the PR description.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] closed pull request #35329: [SPARK-33326][SQL] Update Partition statistic parameters after ANALYZE TABLE ... PARTITION()

2022-06-02 Thread GitBox


github-actions[bot] closed pull request #35329: [SPARK-33326][SQL] Update 
Partition statistic parameters after ANALYZE TABLE ... PARTITION()
URL: https://github.com/apache/spark/pull/35329


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888508050


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##
@@ -355,14 +355,17 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
-if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
-  // SPARK-29260: Enable supported versions once it support altering 
database location.
-  if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
-throw 
QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
-  }
-}
+val loc = getDatabase(database.name).locationUri
+val changeLoc = !database.locationUri.equals(loc)
+
 val hiveDb = toHiveDatabase(database)
 shim.alterDatabase(client, database.name, hiveDb)
+
+if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) {
+  // Some Hive versions don't support changing database location, so we 
check here to see if
+  // the location is actually changed, and throw an error if not.
+  throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
+}

Review Comment:
   If some HMS forks raise exceptions, it will be enough because it will 
propagate to Spark users.
   So, this is only for silent HMS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #36693: [SPARK-39349] Add a centralized CheckError method for QA of error path

2022-06-02 Thread GitBox


HyukjinKwon commented on code in PR #36693:
URL: https://github.com/apache/spark/pull/36693#discussion_r888508847


##
sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala:
##
@@ -54,10 +72,34 @@ class AnalysisException protected[sql] (
   messageParameters: Array[String],
   origin: Origin) =
 this(
-  SparkThrowableHelper.getMessage(errorClass, messageParameters),
+  SparkThrowableHelper.getMessage(errorClass, None, messageParameters),
+  line = origin.line,
+  startPosition = origin.startPosition,
+  errorClass = Some(errorClass),
+  errorSubClass = None,
+  messageParameters = messageParameters)
+
+  def this(
+errorClass: String,
+errorSubClass: String,
+messageParameters: Array[String]) =

Review Comment:
   Just dropping a comment for a nit. maybe would have to be four spaces per 
https://github.com/databricks/scala-style-guide#spacing-and-indentation:
   
   ```scala
 def this(
 errorClass: String,
 errorSubClass: String,
 messageParameters: Array[String]) =
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888508050


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##
@@ -355,14 +355,17 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
-if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
-  // SPARK-29260: Enable supported versions once it support altering 
database location.
-  if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
-throw 
QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
-  }
-}
+val loc = getDatabase(database.name).locationUri
+val changeLoc = !database.locationUri.equals(loc)
+
 val hiveDb = toHiveDatabase(database)
 shim.alterDatabase(client, database.name, hiveDb)
+
+if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) {
+  // Some Hive versions don't support changing database location, so we 
check here to see if
+  // the location is actually changed, and throw an error if not.
+  throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
+}

Review Comment:
   If some HMS forks raises exceptions, it will be enough because it will 
propagate to Spark users.
   So, this is only for silent HMS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888508050


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##
@@ -355,14 +355,17 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
-if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
-  // SPARK-29260: Enable supported versions once it support altering 
database location.
-  if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
-throw 
QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
-  }
-}
+val loc = getDatabase(database.name).locationUri
+val changeLoc = !database.locationUri.equals(loc)
+
 val hiveDb = toHiveDatabase(database)
 shim.alterDatabase(client, database.name, hiveDb)
+
+if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) {
+  // Some Hive versions don't support changing database location, so we 
check here to see if
+  // the location is actually changed, and throw an error if not.
+  throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
+}

Review Comment:
   If some HMS raises exceptions, it will be enough because it will propagate 
to Spark users.
   So, this is only for silent HMS.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] wangyum commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


wangyum commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888507803


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##
@@ -355,14 +355,17 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
-if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
-  // SPARK-29260: Enable supported versions once it support altering 
database location.
-  if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
-throw 
QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
-  }
-}
+val loc = getDatabase(database.name).locationUri
+val changeLoc = !database.locationUri.equals(loc)
+
 val hiveDb = toHiveDatabase(database)
 shim.alterDatabase(client, database.name, hiveDb)
+
+if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) {
+  // Some Hive versions don't support changing database location, so we 
check here to see if
+  // the location is actually changed, and throw an error if not.
+  throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
+}

Review Comment:
   Just silently ignore it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


viirya commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888506874


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala:
##
@@ -355,14 +355,17 @@ private[hive] class HiveClientImpl(
   }
 
   override def alterDatabase(database: CatalogDatabase): Unit = withHiveState {
-if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
-  // SPARK-29260: Enable supported versions once it support altering 
database location.
-  if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
-throw 
QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
-  }
-}
+val loc = getDatabase(database.name).locationUri
+val changeLoc = !database.locationUri.equals(loc)
+
 val hiveDb = toHiveDatabase(database)
 shim.alterDatabase(client, database.name, hiveDb)
+
+if (changeLoc && getDatabase(database.name).locationUri.equals(loc)) {
+  // Some Hive versions don't support changing database location, so we 
check here to see if
+  // the location is actually changed, and throw an error if not.
+  throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
+}

Review Comment:
   So for the hive server which doesn't support it, there is no any exception 
in above `alterDatabase` but it just silently ignore it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a diff in pull request #36750: [SPARK-29260][SQL] Support `ALTER DATABASE SET LOCATION` if HMS supports

2022-06-02 Thread GitBox


viirya commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888506364


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##
@@ -1628,8 +1628,8 @@ object QueryCompilationErrors extends QueryErrorsBase {
 new AnalysisException(s"$tableIdentifier should be converted to 
HadoopFsRelation.")
   }
 
-  def alterDatabaseLocationUnsupportedError(version: String): Throwable = {
-new AnalysisException(s"Hive $version does not support altering database 
location")
+  def alterDatabaseLocationUnsupportedError(): Throwable = {
+new AnalysisException(s"Hive metastore does not support altering database 
location")

Review Comment:
   nit:
   ```suggestion
   new AnalysisException("Hive metastore does not support altering database 
location")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


dtenedor commented on PR #36745:
URL: https://github.com/apache/spark/pull/36745#issuecomment-1145458556

   @sadikovi thanks for your review, these are helpful ideas! Please look again 
when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


dtenedor commented on code in PR #36745:
URL: https://github.com/apache/spark/pull/36745#discussion_r888505739


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala:
##
@@ -427,6 +428,7 @@ class SessionCatalog(
   tableDefinition.copy(identifier = tableIdentifier)
 }
 
+
ResolveDefaultColumns.checkDataSourceSupportsDefaultColumns(tableDefinition)

Review Comment:
   This is not supported currently, the feature is SQL-only as of now. 
References to columns named "default" will simply return "column not found" 
errors until then. We can certainly consider adding this feature to the 
DataFrame API and PySpark later, I would be interested in helping with this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on a diff in pull request #36745: [SPARK-39359][SQL] Restrict DEFAULT columns to allowlist of supported data source types

2022-06-02 Thread GitBox


dtenedor commented on code in PR #36745:
URL: https://github.com/apache/spark/pull/36745#discussion_r888505435


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala:
##
@@ -231,4 +232,18 @@ object ResolveDefaultColumns {
   }
 }
   }
+
+  def checkDataSourceSupportsDefaultColumns(table: CatalogTable): Unit = {
+if (table.schema.fields.map(_.metadata).exists { m =>
+  m.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) ||
+m.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)
+}) {
+  table.provider.getOrElse("").toLowerCase() match {
+case "csv" | "json" | "parquet" | "orc" =>

Review Comment:
   Good ideas! I made this a configuration option. It does support DataSource 
V2, I added a couple more tests to show that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145456039

   Lastly, could you make the PR description up-to-date? For example, the 
following?
   > This PR removes the check so that the command works as long as the Hive 
version used by the HMS...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


sunchao commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888490510


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   Updated the PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888477684


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   Sorry for misleading you, @sunchao . I was wondering this case which didn't 
throw exception. If there is no sign of failures (or no-op), the customer will 
be surprised at the next session.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888482892


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   +1 for throwing exception!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


sunchao commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888482028


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   Good idea! Instead of warning, I'm thinking maybe we should throw exception 
when the location is not changed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888478943


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   Can we do double-check the location inside this method after invoking 
`shim.alterDatabase`?
   
   
https://github.com/apache/spark/blob/52e2717c2d1b6e1f449de5714b6e202074bac26f/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L357-L366



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


sunchao commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888477757


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   It's hard for Spark to give warning in this case since it doesn't know the 
Hive version used by the remote Hive metastore. It's possible in unit tests 
since both client and the metastore are of the same Hive version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888477684


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   Sorry for misleading you, @sunchao . I was wondering this case which didn't 
throw exception. If there is no sign of failures (or no-op), the customer will 
surprise at the next session.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on code in PR #36750:
URL: https://github.com/apache/spark/pull/36750#discussion_r888476932


##
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala:
##
@@ -165,19 +165,19 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 assert(client.getDatabase("temporary").properties.contains("flag"))
 
 // test alter database location
+val oldDatabasePath = database.locationUri.getPath
 val tempDatabasePath2 = Utils.createTempDir().toURI
-// Hive support altering database location since HIVE-8472.
+client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
+val uriInCatalog = client.getDatabase("temporary").locationUri
+assert("file" === uriInCatalog.getScheme)
+
 if (version == "3.0" || version == "3.1") {
-  client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  val uriInCatalog = client.getDatabase("temporary").locationUri
-  assert("file" === uriInCatalog.getScheme)
+  // Hive support altering database location since HIVE-8472
   assert(new Path(tempDatabasePath2.getPath).toUri.getPath === 
uriInCatalog.getPath,
 "Failed to alter database location")
 } else {
-  val e = intercept[AnalysisException] {
-client.alterDatabase(database.copy(locationUri = tempDatabasePath2))
-  }
-  assert(e.getMessage.contains("does not support altering database 
location"))
+  // .. otherwise, the command should be non-effective against older 
versions of Hive
+  assert(oldDatabasePath === uriInCatalog.getPath, "Expected database 
location to be unchanged")

Review Comment:
   If there is no exception from HMS side in this case, we had better give a 
warning from Spark side here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


sunchao commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145425002

   The `ALTER DATABASE SET LOCATION` command will change the default location 
for new tables created afterwards. So in step 2) above, if table location is 
not explicitly specified, the new tables will be created under the new location 
defined in step 1), while the old tables remain unchanged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


sunchao commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145407388

   Fixed. @viirya pls take another look, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] holdenk commented on pull request #36434: [SPARK-38969][K8S] Fix Decom reporting

2022-06-02 Thread GitBox


holdenk commented on PR #36434:
URL: https://github.com/apache/spark/pull/36434#issuecomment-1145371331

   Update: with the change for increased resilence it passes integration tests 
on my machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-06-02 Thread GitBox


amaliujia commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r888383953


##
sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala:
##
@@ -299,15 +313,18 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest {
 val functionFields = 
ScalaReflection.getConstructorParameterValues(function)
 val columnFields = ScalaReflection.getConstructorParameterValues(column)
 assert(dbFields == Seq("nama", "descripta", "locata"))
-assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
+assert(Seq(tableFields.apply(0), tableFields.apply(2), 
tableFields.apply(3),

Review Comment:
   I didn't do this is because the 3rd parameter is the namespace as an Array 
thus `==` does not do deep compare. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #36586: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

2022-06-02 Thread GitBox


amaliujia commented on code in PR #36586:
URL: https://github.com/apache/spark/pull/36586#discussion_r888374320


##
sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala:
##
@@ -64,12 +64,26 @@ class Database(
 @Stable
 class Table(
 val name: String,
-@Nullable val database: String,
+@Nullable val qualifier: Array[String],
 @Nullable val description: String,
 val tableType: String,
 val isTemporary: Boolean)
   extends DefinedByConstructorParams {
 
+  def database: String = parseQualifier
+
+  def parseQualifier: String = {
+if (qualifier == null) {
+  null
+} else if (qualifier.length == 2) {
+  qualifier(1)
+} else if (qualifier.length == 1) {

Review Comment:
   for example if a table is `catalog1.db1.table1`,
   
   In this case `catalog_name=catalog1, qualifier=[db1], table_name=table1`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] JoshRosen commented on pull request #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.

2022-06-02 Thread GitBox


JoshRosen commented on PR #36751:
URL: https://github.com/apache/spark/pull/36751#issuecomment-1145291954

   If I recall, I think the original motivation for this "release all locks at 
the end of the task" code was to prevent indefinite "pin leaks" if tasks fail 
to properly release locks (e.g. because an iterator isn't fully consumed or 
because the task crashes). 
   
   I see how this is a problem in the case of multi-threaded tasks.
   
   As you pointed out in your comment at 
https://github.com/apache/spark/pull/35991#discussion_r887524913, 
   
   > The good news is that the code that takes out the write locks is only in 
the BlockManager and that it properly cleans up after itself (release locks, 
remove data if needed). This means we could fix this issue by removing the 
release of write locks from `releaseAllLocksForTask`.
   
   It might be a good idea to mention this "write locks are only managed in the 
BlockManager" property in a code comment near where you're changing the the 
release all locks code,.
   
   ---
   
   In principle, aren't we also vulnerable to race conditions for read locks? 
Let's say that I've configured Spark to use off-heap memory and I have a task 
where a secondary task thread is reading from that block. If the main task 
thread exits and releases the read locks, then the block manager could decide 
to evict the off-heap block while it's still being read in the secondary 
thread. I think that would be a use-after-free bug which could trigger a 
segfault.
   
   Perhaps that read lock race condition is rarer and less severe from a 
correctness point of view: 
   
   - In this scenario, the reader is a task thread belonging to the finished 
task.
   - If the reader segfaults then the JVM will die and we'll reschedule the 
tasks. This isn't ideal, but compared to the writer scenario 
   - If the reader reads corrupt data because the memory region was re-used in 
a subsequent allocation then I think we're okay as long as the reading thread 
isn't performing external side effects based on the possibly-corrupt data (e.g. 
posting data to an external REST API).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


gengliangwang commented on code in PR #36754:
URL: https://github.com/apache/spark/pull/36754#discussion_r888368841


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##
@@ -46,7 +46,7 @@ import org.apache.spark.sql.types._
  * As commands are executed eagerly, this also includes errors thrown during 
the execution of
  * commands, which users can see immediately.
  */
-object QueryCompilationErrors extends QueryErrorsBase {
+private[sql] object QueryCompilationErrors extends QueryErrorsBase {

Review Comment:
   This was a mistake on 3.2.1.  Let's fix it in 3.3.0



##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryErrorsBase.scala:
##
@@ -42,7 +42,7 @@ import org.apache.spark.sql.types.{DataType, DoubleType, 
FloatType}
  * 7. SQL expressions shall be wrapped by double quotes.
  *   For example: "earnings + 1".
  */
-trait QueryErrorsBase {
+private[sql] trait QueryErrorsBase {

Review Comment:
   This was a mistake on 3.2.1.  Let's fix it in 3.3.0



##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##
@@ -66,7 +66,7 @@ import org.apache.spark.util.CircularBuffer
  * This does not include exceptions thrown during the eager execution of 
commands, which are
  * grouped into [[QueryCompilationErrors]].
  */
-object QueryExecutionErrors extends QueryErrorsBase {
+private[sql] object QueryExecutionErrors extends QueryErrorsBase {

Review Comment:
   This was a mistake on 3.2.1.  Let's fix it in 3.3.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang commented on a diff in pull request #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


gengliangwang commented on code in PR #36754:
URL: https://github.com/apache/spark/pull/36754#discussion_r888303216


##
sql/catalyst/src/main/java/org/apache/spark/sql/util/NumericHistogram.java:
##
@@ -44,10 +44,14 @@
  *   4. In Hive's code, the method [[merge()] pass a serialized histogram,
  *  in Spark, this method pass a deserialized histogram.
  *  Here we change the code about merge bins.
+ *
+ * @since 3.3.0

Review Comment:
   cc @AngersZh 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gengliangwang opened a new pull request, #36754: [SPARK-39367][DOCS][SQL] Review and fix issues in Scala/Java API docs of SQL module

2022-06-02 Thread GitBox


gengliangwang opened a new pull request, #36754:
URL: https://github.com/apache/spark/pull/36754

   
   
   ### What changes were proposed in this pull request?
   
   Compare the 3.3.0 API doc with the latest release version 3.2.1. Fix the 
following issues:
   
   * Add missing Since annotation for new APIs
   * Remove the leaking class/object in API doc
   
   ### Why are the changes needed?
   
   Improve API docs
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Existing UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888299878


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+  } catch (IOException e) {
+logger.error("Exception closing leveldb with registered app paths info 
and "
++ "shuffle partition info", e);
+  }
+}
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+if (db != null) {
+  try {
+byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+String valueStr = mapper.writeValueAsString(appPathsInfo);
+byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+db.put(key, value);
+  } catch (Exception e) {
+logger.error("Error saving registered app paths info", e);
+  }
+}
+  }
+
+  private void writeAppAttemptShuffleMergeInfo(
+  String appId,
+  int appAttemptId,
+  int shuffleId,
+  int shuffleMergeId) {
+if (db != null) {
+  // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+  try{
+byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+db.put(dbKey, new byte[0]);
+  } catch (Exception e) {
+logger.error("Error saving active app shuffle partition", e);
+  }
+}
+
+  }
+
+  private  T parseDbKey(String key, String prefix, Class valueType) 
throws IOException {
+if (!key.startsWith(prefix + DB_KEY_DELIMITER)) {
+  throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+}
+String json = key.substring(prefix.length() + 1);
+return mapper.readValue(json, valueType);
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+  String key,
+  String prefix) throws IOException {
+return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+// we stick a common prefix on all the keys so we can find them in the DB
+String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAppShuffleInfo(DB db) throws IOException {
+logger.info("Reload applications merged shuffle information from DB");
+reloadActiveAppAttemptsPathInfo(db);
+reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+if (db != null) {
+  DBIterator itr = db.iterator();
+  itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+  while (itr.hasNext()) {
+Map.Entry e = itr.next();
+String key = new String(e.getKey(), StandardCharsets.UTF_8);
+if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+  break;
+}
+AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+try{
+  AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+  logger.info("Reloading active application {}_{} merged shuffle files 
paths",
+  appAttemptId.appId, appAttemptId.attemptId);
+  appsShuffleInfo.compute(appAttemptId.appId,
+  (appId, existingAppShuffleInfo) -> {
+if (existingAppShuffleInfo == null ||
+existingAppShuffleInfo.attemptId < appAttemptId.attemptId) 
{
+  return new AppShuffleInfo(
+  appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
+} else {
+  return existingAppShuffleInfo;
+}
+  });
+} catch (Exception exception) {
+  logger.error("Parsing exception is {}", exception);
+}
+  }
+}
+  }
+
+  private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws 
IOException {
+if (db != null) {
+  DBIterator itr = db.iterator();
+  
itr.seek(APP_ATTEMPT_SHUFFLE_FIN

[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888299709


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+  } catch (IOException e) {
+logger.error("Exception closing leveldb with registered app paths info 
and "
++ "shuffle partition info", e);
+  }
+}
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+if (db != null) {
+  try {
+byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+String valueStr = mapper.writeValueAsString(appPathsInfo);
+byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+db.put(key, value);
+  } catch (Exception e) {
+logger.error("Error saving registered app paths info", e);
+  }
+}
+  }
+
+  private void writeAppAttemptShuffleMergeInfo(
+  String appId,
+  int appAttemptId,
+  int shuffleId,
+  int shuffleMergeId) {
+if (db != null) {
+  // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+  try{
+byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+db.put(dbKey, new byte[0]);
+  } catch (Exception e) {
+logger.error("Error saving active app shuffle partition", e);
+  }
+}
+
+  }
+
+  private  T parseDbKey(String key, String prefix, Class valueType) 
throws IOException {
+if (!key.startsWith(prefix + DB_KEY_DELIMITER)) {
+  throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+}
+String json = key.substring(prefix.length() + 1);
+return mapper.readValue(json, valueType);
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+  String key,
+  String prefix) throws IOException {
+return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+// we stick a common prefix on all the keys so we can find them in the DB
+String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAppShuffleInfo(DB db) throws IOException {
+logger.info("Reload applications merged shuffle information from DB");
+reloadActiveAppAttemptsPathInfo(db);
+reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+if (db != null) {
+  DBIterator itr = db.iterator();
+  itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+  while (itr.hasNext()) {
+Map.Entry e = itr.next();
+String key = new String(e.getKey(), StandardCharsets.UTF_8);
+if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+  break;
+}
+AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+try{
+  AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+  logger.info("Reloading active application {}_{} merged shuffle files 
paths",
+  appAttemptId.appId, appAttemptId.attemptId);
+  appsShuffleInfo.compute(appAttemptId.appId,
+  (appId, existingAppShuffleInfo) -> {
+if (existingAppShuffleInfo == null ||
+existingAppShuffleInfo.attemptId < appAttemptId.attemptId) 
{
+  return new AppShuffleInfo(
+  appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
+} else {
+  return existingAppShuffleInfo;
+}
+  });
+} catch (Exception exception) {
+  logger.error("Parsing exception is {}", exception);
+}
+  }
+}
+  }
+
+  private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws 
IOException {
+if (db != null) {
+  DBIterator itr = db.iterator();
+  
itr.seek(APP_ATTEMPT_SHUFFLE_FIN

[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888299473


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+  } catch (IOException e) {
+logger.error("Exception closing leveldb with registered app paths info 
and "
++ "shuffle partition info", e);
+  }
+}
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+if (db != null) {
+  try {
+byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+String valueStr = mapper.writeValueAsString(appPathsInfo);
+byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+db.put(key, value);
+  } catch (Exception e) {
+logger.error("Error saving registered app paths info", e);
+  }
+}
+  }
+
+  private void writeAppAttemptShuffleMergeInfo(
+  String appId,
+  int appAttemptId,
+  int shuffleId,
+  int shuffleMergeId) {
+if (db != null) {
+  // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+  try{
+byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+db.put(dbKey, new byte[0]);
+  } catch (Exception e) {
+logger.error("Error saving active app shuffle partition", e);
+  }
+}
+
+  }
+
+  private  T parseDbKey(String key, String prefix, Class valueType) 
throws IOException {
+if (!key.startsWith(prefix + DB_KEY_DELIMITER)) {
+  throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+}
+String json = key.substring(prefix.length() + 1);
+return mapper.readValue(json, valueType);
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+  String key,
+  String prefix) throws IOException {
+return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+// we stick a common prefix on all the keys so we can find them in the DB
+String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAppShuffleInfo(DB db) throws IOException {
+logger.info("Reload applications merged shuffle information from DB");
+reloadActiveAppAttemptsPathInfo(db);
+reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+if (db != null) {
+  DBIterator itr = db.iterator();
+  itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+  while (itr.hasNext()) {
+Map.Entry e = itr.next();
+String key = new String(e.getKey(), StandardCharsets.UTF_8);
+if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+  break;
+}
+AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+try{
+  AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+  logger.info("Reloading active application {}_{} merged shuffle files 
paths",
+  appAttemptId.appId, appAttemptId.attemptId);
+  appsShuffleInfo.compute(appAttemptId.appId,
+  (appId, existingAppShuffleInfo) -> {
+if (existingAppShuffleInfo == null ||
+existingAppShuffleInfo.attemptId < appAttemptId.attemptId) 
{
+  return new AppShuffleInfo(
+  appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
+} else {
+  return existingAppShuffleInfo;

Review Comment:
   Added.



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+

[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888299188


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() {
 }
   }
 
+  /**
+   * Simply encodes an application attempt ID.
+   */
+  public static class AppAttemptId {

Review Comment:
   Removed all the equals and hashcode for these Json beans classes. Is it 
required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888298796


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -576,6 +661,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   } finally {
 partition.closeAllFilesAndDeleteIfNeeded(false);
   }
+  
cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   Removed the cleanup though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888298391


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -576,6 +661,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   } finally {
 partition.closeAllFilesAndDeleteIfNeeded(false);
   }
+  
cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   Test to be added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888298248


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -209,9 +246,16 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
 appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, 
shuffleMergeId, reduceId));
   File metaFile =
 appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
+  // Make sure unuseful non-finalized merged data/index/meta files get 
cleaned up
+  // during service restart
+  if (dataFile.exists()) dataFile.delete();
+  if (indexFile.exists()) indexFile.delete();
+  if (metaFile.exists()) metaFile.delete();

Review Comment:
   Removed and added comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on PR #35906:
URL: https://github.com/apache/spark/pull/35906#issuecomment-1145216328

   > 
   Added a flag in closeAndDeletePartitionFilesIfNeeded to check whether DB 
cleanup is needed or not.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`

2022-06-02 Thread GitBox


MaxGekk commented on code in PR #36714:
URL: https://github.com/apache/spark/pull/36714#discussion_r888289663


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##
@@ -359,6 +359,32 @@ case class Percentile(
   )
 }
 
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column 
`col`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col);
+   5.0
+  > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH) AS tab(col);
+   5.0
+  """,
+  group = "agg_funcs",
+  since = "3.4.0")
+// scalastyle:on line.size.limit
+case class Median(child: Expression)
+  extends AggregateFunction
+with RuntimeReplaceableAggregate
+with ImplicitCastInputTypes
+with UnaryLike[Expression] {

Review Comment:
   Should be aligned to `extends`, see 
https://github.com/databricks/scala-style-guide#spacing-and-indentation
   ```suggestion
 with RuntimeReplaceableAggregate
 with ImplicitCastInputTypes
 with UnaryLike[Expression] {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on a diff in pull request #36714: [SPARK-39320][SQL] Support aggregate function `MEDIAN`

2022-06-02 Thread GitBox


MaxGekk commented on code in PR #36714:
URL: https://github.com/apache/spark/pull/36714#discussion_r888289663


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##
@@ -359,6 +359,32 @@ case class Percentile(
   )
 }
 
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column 
`col`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col);
+   5.0
+  > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH) AS tab(col);
+   5.0
+  """,
+  group = "agg_funcs",
+  since = "3.4.0")
+// scalastyle:on line.size.limit
+case class Median(child: Expression)
+  extends AggregateFunction
+with RuntimeReplaceableAggregate
+with ImplicitCastInputTypes
+with UnaryLike[Expression] {

Review Comment:
   Should be aligned to `extends`, see 
https://github.com/databricks/scala-style-guide#spacing-and-indentation
   ```suggestion
 with RuntimeReplaceableAggregate
 with ImplicitCastInputTypes
 with UnaryLike[Expression] {
   ```
   ```suggestion
   with RuntimeReplaceableAggregate
   with ImplicitCastInputTypes
   with UnaryLike[Expression] {
   ```



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##
@@ -359,6 +359,32 @@ case class Percentile(
   )
 }
 
+// scalastyle:off line.size.limit

Review Comment:
   Is this really needed?



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/percentiles.scala:
##
@@ -359,6 +359,32 @@ case class Percentile(
   )
 }
 
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(col) - Returns the median of numeric or ansi interval column 
`col`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES (0), (10) AS tab(col);
+   5.0
+  > SELECT _FUNC_(col) FROM VALUES (INTERVAL '0' MONTH), (INTERVAL '10' 
MONTH) AS tab(col);
+   5.0
+  """,
+  group = "agg_funcs",
+  since = "3.4.0")
+// scalastyle:on line.size.limit
+case class Median(child: Expression)
+  extends AggregateFunction
+with RuntimeReplaceableAggregate
+with ImplicitCastInputTypes
+with UnaryLike[Expression] {

Review Comment:
   Should be aligned to `extends`, see 
https://github.com/databricks/scala-style-guide#spacing-and-indentation
   ```suggestion
 with RuntimeReplaceableAggregate
 with ImplicitCastInputTypes
 with UnaryLike[Expression] {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] olaky opened a new pull request, #36753: [SPARK-39259] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


olaky opened a new pull request, #36753:
URL: https://github.com/apache/spark/pull/36753

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] olaky opened a new pull request, #36752: [SPARK-39259] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


olaky opened a new pull request, #36752:
URL: https://github.com/apache/spark/pull/36752

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888286186


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -536,9 +619,11 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 }
   }
   // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
-  // sent to the driver will be empty. This cam happen when the service 
didn't receive any
+  // sent to the driver will be empty. This can happen when the service 
didn't receive any
   // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
   // shuffle.
+  writeAppAttemptShuffleMergeInfo(

Review Comment:
   Moved the write prior to the final return in this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] zhouyejoe commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888284976


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+cleanUpAppShuffleInfoInDB(appShuffleInfo);

Review Comment:
   Yes, applicationRemoved will trigger the DB deletion of AppAttemptPathsInfo 
directly. But the merged shuffle metadata will still be deleted asynchronously 
with the cleanupExecutor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dtenedor commented on pull request #36672: [SPARK-39265][SQL] Support vectorized Parquet scans with DEFAULT values

2022-06-02 Thread GitBox


dtenedor commented on PR #36672:
URL: https://github.com/apache/spark/pull/36672#issuecomment-1145200667

   @HyukjinKwon the CI passes now :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve documentation of pandas API supported list

2022-06-02 Thread GitBox


MaxGekk closed pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve 
documentation of pandas API supported list
URL: https://github.com/apache/spark/pull/36749


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36749: [SPARK-39295][DOCS][PYTHON][3.3] Improve documentation of pandas API supported list

2022-06-02 Thread GitBox


MaxGekk commented on PR #36749:
URL: https://github.com/apache/spark/pull/36749#issuecomment-1145199197

   +1, LGTM. Merging to 3.3.
   Thank you, @beobest2 and @HyukjinKwon @Yikun for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk commented on PR #36654:
URL: https://github.com/apache/spark/pull/36654#issuecomment-1145197543

   @olaky Could you open a separate PRs with backports to branch-3.3 and 
branch-3.2 (according to SPARK-39259, 3.2 has this issue).
   
   Congratulations with the first contribution to Apache Spark, and welcome to 
Spark community! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk closed pull request #36654: [SPARK-39259][SQL] Evaluate timestamps 
consistently in subqueries
URL: https://github.com/apache/spark/pull/36654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk commented on PR #36654:
URL: https://github.com/apache/spark/pull/36654#issuecomment-1145192594

   +1, LGTM. Merging to master, 3.3, 3.2.
   Thank you, @olaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #36654: [SPARK-39259][SQL] Evaluate timestamps consistently in subqueries

2022-06-02 Thread GitBox


MaxGekk commented on PR #36654:
URL: https://github.com/apache/spark/pull/36654#issuecomment-1145192596

   +1, LGTM. Merging to master, 3.3, 3.2.
   Thank you, @olaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] akpatnam25 commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is se

2022-06-02 Thread GitBox


akpatnam25 commented on code in PR #36734:
URL: https://github.com/apache/spark/pull/36734#discussion_r888273505


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4402,12 +4501,20 @@ object DAGSchedulerSuite {
   def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: 
Long = -1): MapStatus =
 MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 
mapTaskId)
 
-  def makeBlockManagerId(host: String): BlockManagerId = {
-BlockManagerId(host + "-exec", host, 12345)
+  def makeBlockManagerId(host: String, isShufflePushMerger: Boolean = false): 
BlockManagerId = {

Review Comment:
   agreed, changed it to accept an optional argument 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


viirya commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145188518

   `AlterNamespaceSetLocationSuite` seems failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] attilapiros commented on a diff in pull request #36512: [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors

2022-06-02 Thread GitBox


attilapiros commented on code in PR #36512:
URL: https://github.com/apache/spark/pull/36512#discussion_r888265229


##
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##
@@ -933,46 +935,56 @@ private[spark] class BlockManager(
   })
   Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
 } else if (level.useDisk && diskStore.contains(blockId)) {
+  var diskData: BlockData = null
   try {
-val diskData = diskStore.getBytes(blockId)
-val iterToReturn: Iterator[Any] = {
-  if (level.deserialized) {
-val diskValues = serializerManager.dataDeserializeStream(
-  blockId,
-  diskData.toInputStream())(info.classTag)
-maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-  } else {
-val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
-  .map { _.toInputStream(dispose = false) }
-  .getOrElse { diskData.toInputStream() }
-serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
-  }
+diskData = diskStore.getBytes(blockId)
+val iterToReturn = if (level.deserialized) {
+  val diskValues = serializerManager.dataDeserializeStream(
+blockId,
+diskData.toInputStream())(info.classTag)
+  maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+} else {
+  val stream = maybeCacheDiskBytesInMemory(info, blockId, level, 
diskData)
+.map { _.toInputStream(dispose = false) }
+.getOrElse { diskData.toInputStream() }
+  serializerManager.dataDeserializeStream(blockId, 
stream)(info.classTag)
 }
 val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
   releaseLockAndDispose(blockId, diskData, taskContext)
 })
 Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
   } catch {
-case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
-  // We need to have detailed log message to catch environmental 
problems easily.
-  // Further details: 
https://issues.apache.org/jira/browse/SPARK-37710
-  processKryoException(ex, blockId)
-  throw ex
+case t: Throwable =>
+  if (diskData != null) {
+diskData.dispose()
+diskData = null
+  }
+  releaseLock(blockId, taskContext)
+  if (isIORelatedException(t)) {
+logInfo(extendMessageWithBlockDetails(t.getMessage, blockId))
+// Remove the block so that its unavailability is reported to 
the driver
+removeBlock(blockId)

Review Comment:
   I have updated the description and the title. @Ngone51 is there something 
else?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-02 Thread GitBox


otterc commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888251830


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+cleanUpAppShuffleInfoInDB(appShuffleInfo);

Review Comment:
   Could you please clarify what's the proposal?
   
   > Do we want to delete the app attempt paths immediately, and do the shuffle 
deletes async (along with path deletes like here) ?
   
   Are we saying to delete app attempt paths metadata that is save in db should 
be cleaned up immediately and the merged shuffle metadata that is saved in db 
would be deleted asynchronously? 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] otterc commented on a diff in pull request #36734: [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to

2022-06-02 Thread GitBox


otterc commented on code in PR #36734:
URL: https://github.com/apache/spark/pull/36734#discussion_r888231614


##
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala:
##
@@ -4402,12 +4501,20 @@ object DAGSchedulerSuite {
   def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: 
Long = -1): MapStatus =
 MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 
mapTaskId)
 
-  def makeBlockManagerId(host: String): BlockManagerId = {
-BlockManagerId(host + "-exec", host, 12345)
+  def makeBlockManagerId(host: String, isShufflePushMerger: Boolean = false): 
BlockManagerId = {

Review Comment:
   how about adding execId as optional argument? Will prevent future changes to 
this method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on pull request #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.

2022-06-02 Thread GitBox


hvanhovell commented on PR #36751:
URL: https://github.com/apache/spark/pull/36751#issuecomment-1145135863

   This is still a WIP. If we think this is the right thing to do, then I will 
add some tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell opened a new pull request, #36751: [WIP][SPARK-39336][CORE] Do not release write locks on task end.

2022-06-02 Thread GitBox


hvanhovell opened a new pull request, #36751:
URL: https://github.com/apache/spark/pull/36751

   ### What changes were proposed in this pull request?
   This PR removes the unlocking of write locks on task end from the 
`BlockInfoManager`.
   
   ### Why are the changes needed?
   The `BlockInfoManager` releases all locks held by a task when the task is 
done. It also release write locks, the problem with that is that a thread 
(other than the main task thread) might still be modifying the block. By 
releasing it the block now seems readable, and a reader might observe a block 
in a partial or non-existent state.
   
   This is a follow-up for https://github.com/apache/spark/pull/35991.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Existing tests should pass. I can add a tests if folks are onboard with this 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] dongjoon-hyun commented on pull request #36750: [SPARK-29260][SQL] Support alter database location for Hive client versions other than 3.0/3.1

2022-06-02 Thread GitBox


dongjoon-hyun commented on PR #36750:
URL: https://github.com/apache/spark/pull/36750#issuecomment-1145110352

   Thank you for pinging me, @sunchao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >