cxzl25 commented on a change in pull request #34431:
URL: https://github.com/apache/spark/pull/34431#discussion_r744273834



##########
File path: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
##########
@@ -1106,26 +1109,84 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
           // enabled.
           recordHiveCall()
           getPartitionsByFilterMethod.invoke(hive, table, filter)
-            .asInstanceOf[JArrayList[Partition]]
+            .asInstanceOf[JArrayList[Partition]].asScala.toSeq
         } catch {
           case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] &&
               shouldFallback =>
             logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
               "filter from Hive. Falling back to fetching all partition 
metadata, which will " +
               "degrade performance. Modifying your Hive metastore 
configuration to set " +
               s"${tryDirectSqlConfVar.varname} to true (if it is not true 
already) may resolve " +
-              "this problem. Otherwise, to avoid degraded performance you can 
set " +
+              "this problem. Or you can enable " +
+              s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK.key} 
" +
+              "to alleviate performance downgrade. " +
+              "Otherwise, to avoid degraded performance you can set " +
               
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
               " to false and let the query fail instead.", ex)
             // HiveShim clients are expected to handle a superset of the 
requested partitions
-            recordHiveCall()
-            getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
+            prunePartitionsFastFallback(hive, table, catalogTable, predicates)
           case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
             throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
         }
       }
 
-    partitions.asScala.toSeq
+    partitions
+  }
+
+  private def prunePartitionsFastFallback(
+      hive: Hive,
+      table: Table,
+      catalogTable: CatalogTable,
+      predicates: Seq[Expression]): Seq[Partition] = {
+    val timeZoneId = SQLConf.get.sessionLocalTimeZone
+
+    // Because there is no way to know whether the partition properties has 
timeZone,
+    // client-side filtering cannot be used with TimeZoneAwareExpression.
+    def hasTimeZoneAwareExpression(e: Expression): Boolean = {
+      e.collectFirst {
+        case t: TimeZoneAwareExpression => t
+      }.isDefined
+    }
+
+    if (!SQLConf.get.metastorePartitionPruningFastFallback ||
+        predicates.isEmpty ||
+        predicates.exists(hasTimeZoneAwareExpression)) {
+      recordHiveCall()
+      getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]].asScala.toSeq
+    } else {
+      try {
+        val partitionSchema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+          catalogTable.partitionSchema)
+        val boundPredicate = 
ExternalCatalogUtils.generatePartitionPredicateByFilter(
+          catalogTable, partitionSchema, predicates)
+
+        def toRow(spec: TablePartitionSpec): InternalRow = {
+          InternalRow.fromSeq(partitionSchema.map { field =>
+            val partValue = if (spec(field.name) == 
ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+              null
+            } else {
+              spec(field.name)
+            }
+            Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
+          })
+        }
+
+        val allPartitionNames = hive.getPartitionNames(
+          table.getDbName, table.getTableName, -1).asScala
+        val partNames = allPartitionNames.filter { p =>
+          val spec = PartitioningUtils.parsePathFragment(p)
+          boundPredicate.eval(toRow(spec))
+        }
+        recordHiveCall()
+        hive.getPartitionsByNames(table, partNames.asJava).asScala.toSeq
+      } catch {
+        case ex: InvocationTargetException if 
ex.getCause.isInstanceOf[MetaException] =>
+          logWarning("Caught Hive MetaException attempting to get partition 
metadata by " +
+            "filter from client side. Falling back to fetching all partition 
metadata", ex)
+          recordHiveCall()
+          getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]].asScala.toSeq

Review comment:
       Thank you for your suggestion, I modified the code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to