[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 > Based on my understanding, the solution of FB team is to retry the following commands multiple times: > > ``` > getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] > ``` @gatorsmile hmm my understanding was different. I thought they were retrying the fetchAllpartitions method. Maybe @tejasapatil can clarify here? > This really depends on what is the actual errors that fail `getPartitionsByFilterMethod`. When there are many concurrent users share the same metastore, `exponential backoff with retries` is very reasonable since most of errors might be caused by timeout or similar reasons. Doesn't this apply with every other HMS API as well? If so, shouldn't we be building a complete solution in HiveShim around this to do an `exponential backoff with retries` on every single HMS call in HiveShim? > If it still fails, I would suggest to fail fast or depends on the conf value of `spark.sql.hive.metastorePartitionPruning.fallback.enabled` Ok I agree. I think we need clarification from @tejasapatil on which call they retry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 @gatorsmile, @tejasapatil was reviewing the code before I added the new config option. I have asked him to review the new code. Lets see what his thoughts are on that. I have also asked him clarification on what he means by exponential backoff with retries. I want to take a step back and revisit [SPARK-17992](https://issues.apache.org/jira/browse/SPARK-17992) and in particular [one of the comments](https://github.com/apache/spark/pull/15673#issuecomment-257120666) from @ericl > For large tables, the degraded performance should be considered a bug as well. > > How about this. > >If direct sql is disabled, log a warning about degraded performance with this flag and fall back to >fetching all partitions. >If direct sql is enabled, crash with a message suggesting to disable filesource partition management >and report a bug. >That way, we will know if there are cases where metastore pruning fails with direct sql enabled. It looks like a compromise was reached where we don't support fetching all the time (and only for a subset of cases). My suggested fix is a cleaner way of approaching it through a SQLConf instead of looking at the Hive config. Thoughts @mallman @ericl --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223498018 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => --- End diff -- Could you review the newer changes I have done? Basically, yes, I agree that fetching all partitions is going to be bad and hence we'll leave it up to the user. They can disable fetching all the partitions by setting "spark.sql.hive.metastorePartitionPruning.fallback.enabled" to false. In that case, we'll never retry. If it is set to "true", then we'll retry. As simple as that. I don't completely understand "exponential backoff with retries". Do you do this at the HMS level? or at the query level? If HMS filter pushdown fails once, does it mean it will succeed in the future? Maybe this is a future improvement to this where instead of a boolean "fallback-enabled" or "fallback-disabled", we can have multiple levels for trying the fallback with timing etc. Thoughts @tejasapatil --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223473324 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,45 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val shouldFallback = SQLConf.get.metastorePartitionPruningFallback val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL // We should get this config value from the metaStore. otherwise hit SPARK-18681. // To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: // val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { - // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => -logWarning("Caught Hive MetaException attempting to get partition metadata by " + - "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) -// HiveShim clients are expected to handle a superset of the requested partitions -getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - tryDirectSql => -throw new RuntimeException("Caught Hive MetaException attempting to get partition " + - "metadata by filter from Hive. You can set the Spark configuration setting " + - s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false to work around this " + - "problem, however this will result in degraded performance. Please report a bug: " + - "https://issues.apache.org/jira/browse/SPARK";, ex) + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => +if (shouldFallback) { + if (!tryDirectSql) { --- End diff -- Good idea. Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223469446 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -79,12 +82,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { -val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> "true", +SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +Seq(attr("ds") === 20170101)) -assert(filteredPartitions.size == testPartitionCount) + assert(filteredPartitions.size == testPartitionCount) +} + } + + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { --- End diff -- Ok I changed the test description to be more accurate of what we are testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223429117 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -79,12 +82,30 @@ class HiveClientSuite(version: String) client = init(true) } - test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { -val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(attr("ds") === 20170101)) + test(s"getPartitionsByFilter returns all partitions when $partPruningFallbackKey=true") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> "true", +SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") { + val client = init(false) + // tryDirectSql = false and a non-string partition filter will always fail. This condition + // is used to test if the fallback works + val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +Seq(attr("ds") === 20170101)) -assert(filteredPartitions.size == testPartitionCount) + assert(filteredPartitions.size == testPartitionCount) +} + } + + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { --- End diff -- Hmm.. The behavior does not depend on the value of tryDirectSqlKey though. It solely only depends on if pruning is enabled and if pruning.fallback is enabled. The reason we set tryDirectSqlKey to false is to generate a consistent way for pruning to fail. Only setting tryDirectSql to false does not throw an exception. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223419868 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -754,26 +755,38 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. --- End diff -- Done @mallman --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223418766 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -754,26 +755,38 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. --- End diff -- > of course any method call may throw an exception in some circumstances. :+1: Yes, you are right. I agree with removing comments which might mislead future developers. I'll rephrase this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 @viirya I have updated the title and description. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223148534 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -87,6 +90,18 @@ class HiveClientSuite(version: String) assert(filteredPartitions.size == testPartitionCount) } + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { +withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK.key -> "false") { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223148506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -544,6 +544,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK = +buildConf("spark.sql.hive.metastorePartitionPruningFallback") + .doc("When true, enable fallback to fetch all partitions if Hive metastore partition " + + "push down fails. This is applicable only if partition pruning is enabled (see " + + s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may degrade performance " + + "if there are a large number of partitions." ) + .booleanConf + .createWithDefault(true) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223139874 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -87,6 +90,18 @@ class HiveClientSuite(version: String) assert(filteredPartitions.size == testPartitionCount) } + test(s"getPartitionsByFilter should throw an exception if $partPruningFallbackKey=false") { +withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK.key -> "false") { --- End diff -- Ok will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223136013 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -544,6 +544,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK = +buildConf("spark.sql.hive.metastorePartitionPruningFallback") + .doc("When true, enable fallback to fetch all partitions if Hive metastore partition " + + "push down fails. This is applicable only if partition pruning is enabled (see " + + s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may degrade performance " + + "if there are a large number of partitions." ) + .booleanConf + .createWithDefault(true) --- End diff -- If we set that to false, SPARK-17992 will change the default behavior. Is that ok? So if we set this flag to false by default, then an exception from HMS will always be re-thrown (direct sql or not). Just want to make sure that we understand that the default behavior will change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r223122115 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -544,6 +544,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK = +buildConf("spark.sql.hive.metastorePartitionPruningFallback") --- End diff -- What is the reasoning for marking this as legacy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 @gatorsmile I have added the config option and an additional test. Here's the new behavior - Setting spark.sql.metastorePartitionPruningFallback to 'false' will ALWAYS throw an exception if partition pushdown fails (Hive throws an exception). This is suggested for queries where you want to fail fast and you know that you have a large number of partitions. - Setting spark.sql.metastorePartitionPruningFallback to 'true' (this is the default setting) will ALWAYS catch exception from Hive and retry with fetching all partitions. However, to be helpful to users, Spark will read the directSql config value from Hive and provide good log messages on what the next steps to do. @dongjoon-hyun @mallman @vanzin If these look good, can we move on this to merge? Thanks a lot for all the comments and discussions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 > Let us add a conf to control it? Failing fast is better than hanging. If users want to get all partitions, they can change the conf by themselves. @gatorsmile We already have a config option "spark.sql.hive.metastorePartitionPruning". If that is set to false, we will never push down the partitions to HMS. I will add "spark.sql.hive.metastorePartitionPruningFallback" which in addition to the previous one controls the fallback behavior. Irrespective of the value of Hive direct SQL, if we enable the pruning fallback, we will catch the exception and fallback to fetch all partitions. Does this sound like a reasonable compromise @mallman ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222372452 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- @dongjoon-hyun @mallman I have updated the log messages to be more descriptive and helpful for the user to indicate what they should try doing. Does that help? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222359233 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- @mallman I haven't tried using that config option. If I am understanding [the documentation for HIVE_MANAGE_FILESOURCE_PARTITIONS](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L547) correctly, if I set that value to false, partitions will not be stored in HMS. That sounds like it is addressing a different issue, no? If that's the suggested way to deal with non-supported partition filters, then this code should always fail if getPartitionsByFilter fails, no? Why even have a fallback (as we do currently)? SPARK-17992 seems to say that Spark should handle certain cases of partition pushdown failures (such as HMS ORM mode). My argument is that the case should be expanded to include even if hive.metastore.try.direct.sql is enabled to be true. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222348679 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- @mallman The key point to note here is that setting direct sql on HMS "may" resolve the problem. It is not guaranteed. HMS only optimistically optimizes this. If direct sql on HMS fails, it will fall back on ORM and then fail again. Spark'ss behavior should not be inconsistent depending on HMS config value. My suggested fix would still call getPartitionsByFilter and if that fails, will call getAllPartitions. We won't be calling getAllPartitions in all cases. It is a fallback mechanism. @gatorsmile hmm.. why not? We know it might be slow and hence the warning. Maybe the warning message should read that this could be slow depending on the number of partitions since partition push-down to HMS failed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222140323 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => logWarning("Caught Hive MetaException attempting to get partition metadata by " + "filter from Hive. Falling back to fetching all partition metadata, which will " + - "degrade performance. Modifying your Hive metastore configuration to set " + - s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) + "degrade performance. Enable direct SQL mode in hive metastore to attempt " + + "to improve performance. However, Hive's direct SQL mode is an optimistic " + + "optimization and does not guarantee improved performance.") --- End diff -- Good idea. Yes that would be better. I'll add that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/22614#discussion_r222123426 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL -// We should get this config value from the metaStore. otherwise hit SPARK-18681. -// To be compatible with hive-0.12 and hive-0.13, In the future we can achieve this by: -// val tryDirectSql = hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean -val tryDirectSql = hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname, - tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean try { // Hive may throw an exception when calling this method in some circumstances, such as - // when filtering on a non-string partition column when the hive config key - // hive.metastore.try.direct.sql is false + // when filtering on a non-string partition column. getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && - !tryDirectSql => + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => --- End diff -- @gatorsmile From HMS side, the error is always the same "MetaException" and there is no way to tell apart a direct SQL error from an error of "not supported" (unfortunately!). How do you propose we address this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...
Github user kmanamcheri commented on the issue: https://github.com/apache/spark/pull/22614 @mallman @cloud-fan @ericl @rezasafi --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22614: HiveClient.getPartitionsByFilter should not throw...
GitHub user kmanamcheri opened a pull request: https://github.com/apache/spark/pull/22614 HiveClient.getPartitionsByFilter should not throw an exception if HMS retries directSql ## What changes were proposed in this pull request? When using partition filter pushdown to HMS, Spark should expect a MetaException from HMS if partition filtering is not supported and should call getAllPartitions instead. HMS is expected to throw a MetaException even if directSql is enabled. ## How was this patch tested? Unit tests on the Spark SQL component were run successfully. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kmanamcheri/spark SPARK-25561 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22614 commit dddffcae8824e72d614fd6202e7fc562c490098b Author: Karthik Manamcheri Date: 2018-10-02T16:11:20Z HiveShim should expect MetaException from HMS in all cases --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r221107390 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- @mallman Your assumption is incorrect. If Hive on direct sql fails, it will retry with ORM. So in this case, I am able to reproduce a issue with postgres where direct sql fails and if it retries with ORM, spark fails! Hive has fallback behavior for direct sql. Filed SPARK-25561 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org