xushiyan commented on code in PR #6821:
URL: https://github.com/apache/hudi/pull/6821#discussion_r983066569
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/SqlKeyGenerator.scala:
##########
@@ -45,37 +49,91 @@ class SqlKeyGenerator(props: TypedProperties) extends
ComplexKeyGenerator(props)
None
}
}
- // The origin key generator class for this table.
- private lazy val originKeyGen = {
- val beforeKeyGenClassName =
props.getString(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME, null)
- if (beforeKeyGenClassName != null && beforeKeyGenClassName.nonEmpty) {
- val keyGenProps = new TypedProperties()
- keyGenProps.putAll(props)
- keyGenProps.remove(SqlKeyGenerator.ORIGIN_KEYGEN_CLASS_NAME)
- val convertedKeyGenClassName =
SqlKeyGenerator.getRealKeyGenClassName(props)
- keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key,
convertedKeyGenClassName)
- Some(KeyGenUtils.createKeyGeneratorByClassName(keyGenProps))
- } else {
- None
+
+ private lazy val complexKeyGen = new ComplexKeyGenerator(props)
+ private lazy val originalKeyGen =
+ Option(props.getString(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME, null))
+ .map { originalKeyGenClassName =>
+ checkArgument(originalKeyGenClassName.nonEmpty)
+
+ val convertedKeyGenClassName =
HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(originalKeyGenClassName)
+
+ val keyGenProps = new TypedProperties()
+ keyGenProps.putAll(props)
+ keyGenProps.remove(SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME)
+ keyGenProps.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key,
convertedKeyGenClassName)
+
+
KeyGenUtils.createKeyGeneratorByClassName(keyGenProps).asInstanceOf[SparkKeyGeneratorInterface]
+ }
+
+ override def getRecordKey(record: GenericRecord): String =
+ originalKeyGen.map {
+ _.getKey(record).getRecordKey
+ } getOrElse {
+ complexKeyGen.getRecordKey(record)
}
+
+ override def getRecordKey(row: Row): String =
+ originalKeyGen.map {
+ _.getRecordKey(row)
+ } getOrElse {
+ complexKeyGen.getRecordKey(row)
+ }
+
+
+ override def getRecordKey(internalRow: InternalRow, schema: StructType):
UTF8String =
+ originalKeyGen.map {
+ _.getRecordKey(internalRow, schema)
+ } getOrElse {
+ complexKeyGen.getRecordKey(internalRow, schema)
+ }
+
+ override def getPartitionPath(record: GenericRecord): String = {
+ val partitionPath = originalKeyGen.map {
+ _.getKey(record).getPartitionPath
+ } getOrElse {
+ complexKeyGen.getPartitionPath(record)
+ }
+
+ convertPartitionPathToSqlType(partitionPath, rowType = false)
}
- override def getRecordKey(record: GenericRecord): String = {
- if (originKeyGen.isDefined) {
- originKeyGen.get.getKey(record).getRecordKey
- } else {
- super.getRecordKey(record)
+ override def getPartitionPath(row: Row): String = {
+ val partitionPath = originalKeyGen.map {
+ _.getPartitionPath(row)
+ } getOrElse {
+ complexKeyGen.getPartitionPath(row)
}
+
+ convertPartitionPathToSqlType(partitionPath, rowType = true)
}
- override def getRecordKey(row: Row): String = {
- if (originKeyGen.isDefined) {
-
originKeyGen.get.asInstanceOf[SparkKeyGeneratorInterface].getRecordKey(row)
- } else {
- super.getRecordKey(row)
+ override def getPartitionPath(internalRow: InternalRow, schema: StructType):
UTF8String = {
+ val partitionPath = originalKeyGen.map {
+ _.getPartitionPath(internalRow, schema)
+ } getOrElse {
+ complexKeyGen.getPartitionPath(internalRow, schema)
+ }
+
+
UTF8String.fromString(convertPartitionPathToSqlType(partitionPath.toString,
rowType = true))
+ }
+
+ override def getRecordKeyFieldNames: util.List[String] = {
+ originalKeyGen.map(_.getRecordKeyFieldNames)
+ .getOrElse(complexKeyGen.getRecordKeyFieldNames)
+ }
+
+ override def getPartitionPathFields: util.List[String] = {
+ originalKeyGen.map {
+ case bkg: BaseKeyGenerator => bkg.getPartitionPathFields
+ case _ =>
+
Option(super.getPartitionPathFields).getOrElse(Collections.emptyList[String])
Review Comment:
dont quite understand this part. `super.getPartitionPathFields` also goes to
BaseKeyGen. can you help clarify pls
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]