alexeykudinkin commented on code in PR #6821:
URL: https://github.com/apache/hudi/pull/6821#discussion_r983069105


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala:
##########
@@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends 
ComplexKeyGenerator(props)
       None
     }
   }
-  // The origin key generator class for this table.
-  private lazy val originKeyGen = {
-    val beforeKeyGenClassName = 
props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
-    if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
-      val keyGenProps = new TypedProperties()
-      keyGenProps.putAll(props)
-      keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
-      val convertedKeyGenClassName = 
SqlKeyGenerator.getRealKeyGenClassName(props)
-      keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
convertedKeyGenClassName)
-      Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
-    } else {
-      None
+
+  private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+  private lazy val originalKeyGen =
+    Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
+      .map { originalKeyGenClassName =>
+        checkArgument(originalKeyGenClassName.nonEmpty)
+
+        val convertedKeyGenClassName = 
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)
+
+        val keyGenProps = new TypedProperties()
+        keyGenProps.putAll(props)
+        keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
+        keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
convertedKeyGenClassName)
+
+        
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+      }
+
+  override def getRecordKey(record: GenericRecord): String =
+    originalKeyGen.map {
+      _.getKey(record).getRecordKey
+    } getOrElse {
+      complexKeyGen.getRecordKey(record)
     }
+
+  override def getRecordKey(row: Row): String =
+    originalKeyGen.map {
+      _.getRecordKey(row)
+    } getOrElse {
+      complexKeyGen.getRecordKey(row)
+    }
+
+
+  override def getRecordKey(internalRow: InternalRow, schema: StructType): 
UTF8String =
+    originalKeyGen.map {
+      _.getRecordKey(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getRecordKey(internalRow, schema)
+    }
+
+  override def getPartitionPath(record: GenericRecord): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getKey(record).getPartitionPath
+    } getOrElse {
+      complexKeyGen.getPartitionPath(record)
+    }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = false)
   }
 
-  override def getRecordKey(record: GenericRecord): String = {
-    if (originKeyGen.isDefined) {
-      originKeyGen.get.getKey(record).getRecordKey
-    } else {
-      super.getRecordKey(record)
+  override def getPartitionPath(row: Row): String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(row)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(row)
     }
+
+    convertPartitionPathToSqlType(partitionPath, rowType = true)
   }
 
-  override def getRecordKey(row: Row): String = {
-    if (originKeyGen.isDefined) {
-      
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
-    } else {
-      super.getRecordKey(row)
+  override def getPartitionPath(internalRow: InternalRow, schema: StructType): 
UTF8String = {
+    val partitionPath = originalKeyGen.map {
+      _.getPartitionPath(internalRow, schema)
+    } getOrElse {
+      complexKeyGen.getPartitionPath(internalRow, schema)
+    }
+
+    
UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString, 
rowType = true))
+  }
+
+  override def getRecordKeyFieldNames: util.List[String] = {
+    originalKeyGen.map(_.getRecordKeyFieldNames)
+      .getOrElse(complexKeyGen.getRecordKeyFieldNames)
+  }
+
+  override def getPartitionPathFields: util.List[String] = {
+    originalKeyGen.map {
+      case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
+      case _ =>
+        
Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])

Review Comment:
   `bkg` in this context is `originalKeyGen`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to