[GitHub] [hudi] vinothchandar commented on a diff in pull request #5737: [HUDI-4178][Stacked on 5733] Fixing `HoodieSpark3Analysis` missing to pass schema from Spark Catalog

2022-06-03 Thread GitBox


vinothchandar commented on code in PR #5737:
URL: https://github.com/apache/hudi/pull/5737#discussion_r889028951


##
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala:
##
@@ -45,16 +45,22 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
   with SparkAdapterSupport with ProvidesHoodieConfig {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
-case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
-  val output = dsv2.output
-  val catalogTable = if (d.catalogTable.isDefined) {
-Some(d.v1Table)
-  } else {
-None
-  }
-  val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
-buildHoodieConfig(d.hoodieCatalogTable))
-  LogicalRelation(relation, output, catalogTable, isStreaming = false)
+// NOTE: This step is required since Hudi relations don't currently 
implement DS V2 Read API
+case dsv2 @ DataSourceV2Relation(tbl: HoodieInternalV2Table, _, _, _, _) =>
+  val qualifiedTableName = QualifiedTableName(tbl.v1Table.database, 
tbl.v1Table.identifier.table)
+  val catalog = sparkSession.sessionState.catalog
+
+  catalog.getCachedPlan(qualifiedTableName, () => {

Review Comment:
   I am just asking for ideas to fix this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a diff in pull request #5737: [HUDI-4178][Stacked on 5733] Fixing `HoodieSpark3Analysis` missing to pass schema from Spark Catalog

2022-06-02 Thread GitBox


vinothchandar commented on code in PR #5737:
URL: https://github.com/apache/hudi/pull/5737#discussion_r888475377


##
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala:
##
@@ -45,16 +45,22 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
   with SparkAdapterSupport with ProvidesHoodieConfig {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
-case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
-  val output = dsv2.output
-  val catalogTable = if (d.catalogTable.isDefined) {
-Some(d.v1Table)
-  } else {
-None
-  }
-  val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
-buildHoodieConfig(d.hoodieCatalogTable))
-  LogicalRelation(relation, output, catalogTable, isStreaming = false)
+// NOTE: This step is required since Hudi relations don't currently 
implement DS V2 Read API
+case dsv2 @ DataSourceV2Relation(tbl: HoodieInternalV2Table, _, _, _, _) =>
+  val qualifiedTableName = QualifiedTableName(tbl.v1Table.database, 
tbl.v1Table.identifier.table)
+  val catalog = sparkSession.sessionState.catalog
+
+  catalog.getCachedPlan(qualifiedTableName, () => {

Review Comment:
   @leesf I propose we revert back to v1 and push out 0.11.1 to fix these perf 
regressions. Do you see any concerns with that ? 
   
   @YannByron would any of the follow-on sql work break if we revert? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a diff in pull request #5737: [HUDI-4178][Stacked on 5733] Fixing `HoodieSpark3Analysis` missing to pass schema from Spark Catalog

2022-06-02 Thread GitBox


vinothchandar commented on code in PR #5737:
URL: https://github.com/apache/hudi/pull/5737#discussion_r888460126


##
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala:
##
@@ -45,16 +45,22 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
   with SparkAdapterSupport with ProvidesHoodieConfig {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
-case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
-  val output = dsv2.output
-  val catalogTable = if (d.catalogTable.isDefined) {
-Some(d.v1Table)
-  } else {
-None
-  }
-  val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
-buildHoodieConfig(d.hoodieCatalogTable))
-  LogicalRelation(relation, output, catalogTable, isStreaming = false)
+// NOTE: This step is required since Hudi relations don't currently 
implement DS V2 Read API
+case dsv2 @ DataSourceV2Relation(tbl: HoodieInternalV2Table, _, _, _, _) =>
+  val qualifiedTableName = QualifiedTableName(tbl.v1Table.database, 
tbl.v1Table.identifier.table)
+  val catalog = sparkSession.sessionState.catalog
+
+  catalog.getCachedPlan(qualifiedTableName, () => {

Review Comment:
   So the issue is that this cache is never invalidated, on write. and V1 does 
not have a notion of Catalog (to be used for writes)



##
hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark3Analysis.scala:
##
@@ -45,16 +45,22 @@ case class HoodieSpark3Analysis(sparkSession: SparkSession) 
extends Rule[Logical
   with SparkAdapterSupport with ProvidesHoodieConfig {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
-case dsv2 @ DataSourceV2Relation(d: HoodieInternalV2Table, _, _, _, _) =>
-  val output = dsv2.output
-  val catalogTable = if (d.catalogTable.isDefined) {
-Some(d.v1Table)
-  } else {
-None
-  }
-  val relation = new DefaultSource().createRelation(new 
SQLContext(sparkSession),
-buildHoodieConfig(d.hoodieCatalogTable))
-  LogicalRelation(relation, output, catalogTable, isStreaming = false)
+// NOTE: This step is required since Hudi relations don't currently 
implement DS V2 Read API
+case dsv2 @ DataSourceV2Relation(tbl: HoodieInternalV2Table, _, _, _, _) =>
+  val qualifiedTableName = QualifiedTableName(tbl.v1Table.database, 
tbl.v1Table.identifier.table)
+  val catalog = sparkSession.sessionState.catalog
+
+  catalog.getCachedPlan(qualifiedTableName, () => {

Review Comment:
   So the issue is that this cache is never invalidated, on write. and V1 does 
not have a notion of Catalog (to be used for writes)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org