[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...

2018-10-09 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
> Based on my understanding, the solution of FB team is to retry the 
following commands multiple times:
> 
> ```
> getPartitionsByFilterMethod.invoke(hive, table, 
filter).asInstanceOf[JArrayList[Partition]]
> ```

@gatorsmile hmm my understanding was different. I thought they were 
retrying the fetchAllpartitions method. Maybe @tejasapatil can clarify here?

> This really depends on what is the actual errors that fail 
`getPartitionsByFilterMethod`. When there are many concurrent users share the 
same metastore, `exponential backoff with retries` is very reasonable since 
most of errors might be caused by timeout or similar reasons.

Doesn't this apply with every other HMS API as well? If so, shouldn't we be 
building a complete solution in HiveShim around this to do an `exponential 
backoff with retries` on every single HMS call in HiveShim?

> If it still fails, I would suggest to fail fast or depends on the conf 
value of `spark.sql.hive.metastorePartitionPruning.fallback.enabled`

Ok I agree. 

I think we need clarification from @tejasapatil on which call they retry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
@gatorsmile, @tejasapatil was reviewing the code before I added the new 
config option. I have asked him to review the new code. Lets see what his 
thoughts are on that. I have also asked him clarification on what he means by 
exponential backoff with retries.

I want to take a step back and revisit 
[SPARK-17992](https://issues.apache.org/jira/browse/SPARK-17992) and in 
particular [one of the 
comments](https://github.com/apache/spark/pull/15673#issuecomment-257120666) 
from @ericl 

> For large tables, the degraded performance should be considered a bug as 
well.
>
> How about this.
>
>If direct sql is disabled, log a warning about degraded performance with 
this flag and fall back to >fetching all partitions.
>If direct sql is enabled, crash with a message suggesting to disable 
filesource partition management >and report a bug.
>That way, we will know if there are cases where metastore pruning fails 
with direct sql enabled.

It looks like a compromise was reached where we don't support fetching all 
the time (and only for a subset of cases). My suggested fix is a cleaner way of 
approaching it through a SQLConf instead of looking at the Hive config. 

Thoughts @mallman @ericl 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223498018
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
--- End diff --

Could you review the newer changes I have done? Basically, yes, I agree 
that fetching all partitions is going to be bad and hence we'll leave it up to 
the user. They can disable fetching all the partitions by setting 
"spark.sql.hive.metastorePartitionPruning.fallback.enabled" to false. In that 
case, we'll never retry. If it is set to "true", then we'll retry. As simple as 
that.

I don't completely understand "exponential backoff with retries". Do you do 
this at the HMS level? or at the query level? If HMS filter pushdown fails 
once, does it mean it will succeed in the future? Maybe this is a future 
improvement to this where instead of a boolean "fallback-enabled" or 
"fallback-disabled", we can have multiple levels for trying the fallback with 
timing etc. Thoughts @tejasapatil 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223473324
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,45 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
+val shouldFallback = SQLConf.get.metastorePartitionPruningFallback
 val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
 // We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
 // To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
 // val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
 val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
   tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
-  // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
-logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
-  "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-  "degrade performance. Modifying your Hive metastore 
configuration to set " +
-  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
-// HiveShim clients are expected to handle a superset of the 
requested partitions
-getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  tryDirectSql =>
-throw new RuntimeException("Caught Hive MetaException 
attempting to get partition " +
-  "metadata by filter from Hive. You can set the Spark 
configuration setting " +
-  s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false 
to work around this " +
-  "problem, however this will result in degraded performance. 
Please report a bug: " +
-  "https://issues.apache.org/jira/browse/SPARK";, ex)
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+if (shouldFallback) {
+  if (!tryDirectSql) {
--- End diff --

Good idea. Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223469446
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -79,12 +82,30 @@ class HiveClientSuite(version: String)
 client = init(true)
   }
 
-  test(s"getPartitionsByFilter returns all partitions when 
$tryDirectSqlKey=false") {
-val client = init(false)
-val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
-  Seq(attr("ds") === 20170101))
+  test(s"getPartitionsByFilter returns all partitions when 
$partPruningFallbackKey=true") {
+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> 
"true",
+SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") {
+  val client = init(false)
+  // tryDirectSql = false and a non-string partition filter will 
always fail. This condition
+  // is used to test if the fallback works
+  val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
+Seq(attr("ds") === 20170101))
 
-assert(filteredPartitions.size == testPartitionCount)
+  assert(filteredPartitions.size == testPartitionCount)
+}
+  }
+
+  test(s"getPartitionsByFilter should throw an exception if 
$partPruningFallbackKey=false") {
--- End diff --

Ok I changed the test description to be more accurate of what we are 
testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223429117
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -79,12 +82,30 @@ class HiveClientSuite(version: String)
 client = init(true)
   }
 
-  test(s"getPartitionsByFilter returns all partitions when 
$tryDirectSqlKey=false") {
-val client = init(false)
-val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
-  Seq(attr("ds") === 20170101))
+  test(s"getPartitionsByFilter returns all partitions when 
$partPruningFallbackKey=true") {
+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ENABLED.key -> 
"true",
+SQLConf.HIVE_METASTORE_PARTITION_PRUNING.key -> "true") {
+  val client = init(false)
+  // tryDirectSql = false and a non-string partition filter will 
always fail. This condition
+  // is used to test if the fallback works
+  val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
+Seq(attr("ds") === 20170101))
 
-assert(filteredPartitions.size == testPartitionCount)
+  assert(filteredPartitions.size == testPartitionCount)
+}
+  }
+
+  test(s"getPartitionsByFilter should throw an exception if 
$partPruningFallbackKey=false") {
--- End diff --

Hmm.. The behavior does not depend on the value of tryDirectSqlKey though. 
It solely only depends on if pruning is enabled and if pruning.fallback is 
enabled. 

The reason we set tryDirectSqlKey to false is to generate a consistent way 
for pruning to fail. Only setting tryDirectSql to false does not throw an 
exception.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223419868
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -754,26 +755,38 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
--- End diff --

Done @mallman 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] Implement a new config to cont...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223418766
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -754,26 +755,38 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
--- End diff --

> of course any method call may throw an exception in some circumstances.
:+1: Yes, you are right. I agree with removing comments which might mislead 
future developers. I'll rephrase this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22614: [SPARK-25561][SQL] Implement a new config to control par...

2018-10-08 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
@viirya I have updated the title and description. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223148534
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -87,6 +90,18 @@ class HiveClientSuite(version: String)
 assert(filteredPartitions.size == testPartitionCount)
   }
 
+  test(s"getPartitionsByFilter should throw an exception if 
$partPruningFallbackKey=false") {
+withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK.key -> 
"false") {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223148506
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -544,6 +544,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK =
+buildConf("spark.sql.hive.metastorePartitionPruningFallback")
+  .doc("When true, enable fallback to fetch all partitions if Hive 
metastore partition " +
+   "push down fails. This is applicable only if partition pruning 
is enabled (see " +
+   s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may 
degrade performance " +
+   "if there are a large number of partitions." )
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223139874
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -87,6 +90,18 @@ class HiveClientSuite(version: String)
 assert(filteredPartitions.size == testPartitionCount)
   }
 
+  test(s"getPartitionsByFilter should throw an exception if 
$partPruningFallbackKey=false") {
+withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK.key -> 
"false") {
--- End diff --

Ok will do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223136013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -544,6 +544,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK =
+buildConf("spark.sql.hive.metastorePartitionPruningFallback")
+  .doc("When true, enable fallback to fetch all partitions if Hive 
metastore partition " +
+   "push down fails. This is applicable only if partition pruning 
is enabled (see " +
+   s" ${HIVE_METASTORE_PARTITION_PRUNING.key}). Enabling this may 
degrade performance " +
+   "if there are a large number of partitions." )
+  .booleanConf
+  .createWithDefault(true)
--- End diff --

If we set that to false, SPARK-17992 will change the default behavior. Is 
that ok?

So if we set this flag to false by default, then an exception from HMS will 
always be re-thrown (direct sql or not). Just want to make sure that we 
understand that the default behavior will change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r223122115
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -544,6 +544,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FALLBACK =
+buildConf("spark.sql.hive.metastorePartitionPruningFallback")
--- End diff --

What is the reasoning for marking this as legacy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...

2018-10-05 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
@gatorsmile I have added the config option and an additional test.

Here's the new behavior
- Setting spark.sql.metastorePartitionPruningFallback to 'false' will 
ALWAYS throw an exception if partition pushdown fails (Hive throws an 
exception). This is suggested for queries where you want to fail fast and you 
know that you have a large number of partitions.
- Setting spark.sql.metastorePartitionPruningFallback to 'true' (this is 
the default setting) will ALWAYS catch exception from Hive and retry with 
fetching all partitions. However, to be helpful to users, Spark will read the 
directSql config value from Hive and provide good log messages on what the next 
steps to do.

@dongjoon-hyun @mallman @vanzin If these look good, can we move on this to 
merge? Thanks a lot for all the comments and discussions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...

2018-10-03 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
> Let us add a conf to control it? Failing fast is better than hanging. If 
users want to get all partitions, they can change the conf by themselves.

@gatorsmile We already have a config option 
"spark.sql.hive.metastorePartitionPruning". If that is set to false, we will 
never push down the partitions to HMS. I will add 
"spark.sql.hive.metastorePartitionPruningFallback" which in addition to the 
previous one controls the fallback behavior. Irrespective of the value of Hive 
direct SQL, if we enable the pruning fallback, we will catch the exception and 
fallback to fetch all partitions. Does this sound like a reasonable compromise 
@mallman ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-03 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r222372452
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
 logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
   "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-  "degrade performance. Modifying your Hive metastore 
configuration to set " +
-  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
+  "degrade performance. Enable direct SQL mode in hive 
metastore to attempt " +
+  "to improve performance. However, Hive's direct SQL mode is 
an optimistic " +
+  "optimization and does not guarantee improved performance.")
--- End diff --

@dongjoon-hyun @mallman I have updated the log messages to be more 
descriptive and helpful for the user to indicate what they should try doing. 
Does that help?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-03 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r222359233
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
 logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
   "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-  "degrade performance. Modifying your Hive metastore 
configuration to set " +
-  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
+  "degrade performance. Enable direct SQL mode in hive 
metastore to attempt " +
+  "to improve performance. However, Hive's direct SQL mode is 
an optimistic " +
+  "optimization and does not guarantee improved performance.")
--- End diff --

@mallman I haven't tried using that config option. If I am understanding 
[the documentation for 
HIVE_MANAGE_FILESOURCE_PARTITIONS](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L547)
 correctly, if I set that value to false, partitions will not be stored in HMS. 
That sounds like it is addressing a different issue, no?

If that's the suggested way to deal with non-supported partition filters, 
then this code should always fail if getPartitionsByFilter fails, no? Why even 
have a fallback (as we do currently)? SPARK-17992 seems to say that Spark 
should handle certain cases of partition pushdown failures (such as HMS ORM 
mode). My argument is that the case should be expanded to include even if 
hive.metastore.try.direct.sql is enabled to be true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-03 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r222348679
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
 logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
   "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-  "degrade performance. Modifying your Hive metastore 
configuration to set " +
-  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
+  "degrade performance. Enable direct SQL mode in hive 
metastore to attempt " +
+  "to improve performance. However, Hive's direct SQL mode is 
an optimistic " +
+  "optimization and does not guarantee improved performance.")
--- End diff --

@mallman The key point to note here is that setting direct sql on HMS "may" 
resolve the problem. It is not guaranteed. HMS only optimistically optimizes 
this. If direct sql on HMS fails, it will fall back on ORM and then fail again. 
Spark'ss behavior should not be inconsistent depending on HMS config value. 

My suggested fix would still call getPartitionsByFilter and if that fails, 
will call getAllPartitions. We won't be calling getAllPartitions in all cases. 
It is a fallback mechanism. 

@gatorsmile hmm.. why not? We know it might be slow and hence the warning. 
Maybe the warning message should read that this could be slow depending on the 
number of partitions since partition push-down to HMS failed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-02 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r222140323
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
 logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
   "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
-  "degrade performance. Modifying your Hive metastore 
configuration to set " +
-  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
+  "degrade performance. Enable direct SQL mode in hive 
metastore to attempt " +
+  "to improve performance. However, Hive's direct SQL mode is 
an optimistic " +
+  "optimization and does not guarantee improved performance.")
--- End diff --

Good idea. Yes that would be better. I'll add that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilt...

2018-10-02 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/22614#discussion_r222123426
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -746,34 +746,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
-val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
-// We should get this config value from the metaStore. otherwise 
hit SPARK-18681.
-// To be compatible with hive-0.12 and hive-0.13, In the future we 
can achieve this by:
-// val tryDirectSql = 
hive.getMetaConf(tryDirectSqlConfVar.varname).toBoolean
-val tryDirectSql = 
hive.getMSC.getConfigValue(tryDirectSqlConfVar.varname,
-  tryDirectSqlConfVar.defaultBoolVal.toString).toBoolean
 try {
   // Hive may throw an exception when calling this method in some 
circumstances, such as
-  // when filtering on a non-string partition column when the hive 
config key
-  // hive.metastore.try.direct.sql is false
+  // when filtering on a non-string partition column.
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
-  !tryDirectSql =>
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
--- End diff --

@gatorsmile From HMS side, the error is always the same "MetaException" and 
there is no way to tell apart a direct SQL error from an error of "not 
supported" (unfortunately!). How do you propose we address this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22614: [SPARK-25561][SQL] HiveClient.getPartitionsByFilter shou...

2018-10-02 Thread kmanamcheri
Github user kmanamcheri commented on the issue:

https://github.com/apache/spark/pull/22614
  
@mallman @cloud-fan @ericl @rezasafi 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22614: HiveClient.getPartitionsByFilter should not throw...

2018-10-02 Thread kmanamcheri
GitHub user kmanamcheri opened a pull request:

https://github.com/apache/spark/pull/22614

HiveClient.getPartitionsByFilter should not throw an exception if HMS 
retries directSql

## What changes were proposed in this pull request?

When using partition filter pushdown to HMS, Spark should expect a 
MetaException from HMS if
partition filtering is not supported and should call getAllPartitions 
instead. HMS is expected to
throw a MetaException even if directSql is enabled.

## How was this patch tested?

Unit tests on the Spark SQL component were run successfully.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kmanamcheri/spark SPARK-25561

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22614


commit dddffcae8824e72d614fd6202e7fc562c490098b
Author: Karthik Manamcheri 
Date:   2018-10-02T16:11:20Z

HiveShim should expect MetaException from HMS in all cases




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...

2018-09-27 Thread kmanamcheri
Github user kmanamcheri commented on a diff in the pull request:

https://github.com/apache/spark/pull/15673#discussion_r221107390
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
+val tryDirectSqlConfVar = 
HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+val tryDirectSql =
+  hive.getConf.getBoolean(tryDirectSqlConfVar.varname, 
tryDirectSqlConfVar.defaultBoolVal)
 try {
+  // Hive may throw an exception when calling this method in some 
circumstances, such as
+  // when filtering on a non-string partition column when the hive 
config key
+  // hive.metastore.try.direct.sql is false
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case e: InvocationTargetException =>
-// SPARK-18167 retry to investigate the flaky test. This 
should be reverted before
-// the release is cut.
-val retry = Try(getPartitionsByFilterMethod.invoke(hive, 
table, filter))
-logError("getPartitionsByFilter failed, retry success = " + 
retry.isSuccess)
-logError("all partitions: " + getAllPartitions(hive, table))
-throw e
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
+  !tryDirectSql =>
+logWarning("Caught Hive MetaException attempting to get 
partition metadata by " +
+  "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
+  "degrade performance. Modifying your Hive metastore 
configuration to set " +
+  s"${tryDirectSqlConfVar.varname} to true may resolve this 
problem.", ex)
+// HiveShim clients are expected to handle a superset of the 
requested partitions
+getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
+  case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
+  tryDirectSql =>
+throw new RuntimeException("Caught Hive MetaException 
attempting to get partition " +
--- End diff --

@mallman Your assumption is incorrect. If Hive on direct sql fails, it will 
retry with ORM. So in this case, I am able to reproduce a issue with postgres 
where direct sql fails and if it retries with ORM, spark fails! Hive has 
fallback behavior for direct sql. 

Filed SPARK-25561


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org