bharos commented on code in PR #12327:
URL: https://github.com/apache/iceberg/pull/12327#discussion_r1962595085
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -196,11 +193,36 @@ private void importFileTable(
String format,
Map<String, String> partitionFilter,
boolean checkDuplicateFiles,
- PartitionSpec spec,
int parallelism) {
+
+ org.apache.spark.sql.execution.datasources.PartitionSpec inferredSpec =
+ getInferredSpec(spark(), tableLocation);
+
+ List<String> sparkPartNames =
+ JavaConverters.seqAsJavaList(inferredSpec.partitionColumns()).stream()
+ .map(StructField::name)
+ .map(name -> name.toLowerCase(Locale.ROOT))
+ .collect(Collectors.toList());
+ PartitionSpec matchingSpec = null;
+ for (PartitionSpec icebergSpec : table.specs().values()) {
+ List<String> icebergPartNames =
+ icebergSpec.fields().stream()
+ .map(PartitionField::name)
+ .map(name -> name.toLowerCase(Locale.ROOT))
+ .collect(Collectors.toList());
+ if (icebergPartNames.equals(sparkPartNames)) {
+ matchingSpec = icebergSpec;
+ break;
+ }
+ }
+ if (matchingSpec == null) {
+ // If the inferred spec does not match any existing spec, use the
Iceberg table's latest spec
+ matchingSpec = table.spec();
Review Comment:
Using the findCompatibleSpec method from SparkTableUtil. so now this will
throw an error
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -265,13 +283,13 @@ public String description() {
return "AddFiles";
}
- private void validatePartitionSpec(Table table, Map<String, String>
partitionFilter) {
+ private void validatePartitionFilter(Table table, Map<String, String>
partitionFilter) {
List<PartitionField> partitionFields = table.spec().fields();
Set<String> partitionNames =
table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet());
Review Comment:
Fixed this to take partition spec and not always take the table latest spec.
Although, I had to make a refactor here, I moved the identity partition
check from this function to another function so that I can check it at the
beginning itself, because we can just exit early in that case.
Then, I am doing the `validatePartitionFilter` later, after we find a
compatible spec (because now we have to pass it to this function)
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -196,11 +193,36 @@ private void importFileTable(
String format,
Map<String, String> partitionFilter,
boolean checkDuplicateFiles,
- PartitionSpec spec,
int parallelism) {
+
+ org.apache.spark.sql.execution.datasources.PartitionSpec inferredSpec =
+ getInferredSpec(spark(), tableLocation);
+
+ List<String> sparkPartNames =
Review Comment:
Thanks, I refactored the part that's commonly needed to a public method. PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]