RussellSpitzer commented on code in PR #15150:
URL: https://github.com/apache/iceberg/pull/15150#discussion_r2760079638
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java:
##########
@@ -54,6 +54,7 @@ private SparkWriteOptions() {}
public static final String REWRITTEN_FILE_SCAN_TASK_SET_ID =
"rewritten-file-scan-task-set-id";
public static final String OUTPUT_SPEC_ID = "output-spec-id";
+ public static final String OUTPUT_SORT_ORDER_ID = "output-sort-order-id";
Review Comment:
I guess what I was thinking, and let me know if this doesn't make sense. But
at some point we decide what the Spark Sort Distribution/ordering is. At that
moment we know whether or not the files we write will match a table sort order
and we can carry through the id.
Something like
```java
public Integer outputSortOrderId(SparkWriteRequirements writeRequirements)
{
if (writeRequirements == null || writeRequirements.ordering().length ==
0) {
return SortOrder.unsorted().orderId();
}
org.apache.spark.sql.connector.expressions.SortOrder[] sparkOrdering =
writeRequirements.ordering();
// Find the first table sort order that is satisfied by the Spark
ordering
for (SortOrder tableSortOrder : table.sortOrders().values()) {
if (tableSortOrder.isUnsorted()) {
continue;
}
// This is what we would expect this sort order to produce
SortOrder fullSortOrder =
SortOrderUtil.buildSortOrder(table.schema(), table.spec(),
tableSortOrder);
// Convert our expected ordering to a spark one
org.apache.spark.sql.connector.expressions.SortOrder[]
expectedSparkOrdering =
Spark3Util.toOrdering(fullSortOrder);
if (orderingSatisfies(sparkOrdering, expectedSparkOrdering)) {
return tableSortOrder.orderId();
}
}
// If no match is found, return unsorted
return SortOrder.unsorted().orderId();
}
}
/**
* I asked cursor to generate this, I think there may be much better ways
to do this, or at least making a
* dedicated utility method
*/
private boolean orderingSatisfies(
org.apache.spark.sql.connector.expressions.SortOrder[] actualOrdering,
org.apache.spark.sql.connector.expressions.SortOrder[]
expectedOrdering) {
if (actualOrdering.length < expectedOrdering.length) {
return false;
}
for (int i = 0; i < expectedOrdering.length; i++) {
org.apache.spark.sql.connector.expressions.SortOrder actual =
actualOrdering[i];
org.apache.spark.sql.connector.expressions.SortOrder expected =
expectedOrdering[i];
// Check if expressions match
if (!expressionsMatch(actual.expression(), expected.expression())) {
return false;
}
// Check if direction matches
if (actual.direction() != expected.direction()) {
return false;
}
// Check if null order matches
if (actual.nullOrdering() != expected.nullOrdering()) {
return false;
}
}
return true;
}
private boolean expressionsMatch(
org.apache.spark.sql.connector.expressions.Expression expr1,
org.apache.spark.sql.connector.expressions.Expression expr2) {
return expr1.toString().equals(expr2.toString());
}
```
If we do that, then we can just past through the int we want to mark the
write with. If Spark decides to do weird things to the required ordering it
doesn't matter to us, we are comparing only what they are using to known sort
orders
The we just give the id to the writerFactory -- I have more code if you
want to see but I just threw it into cursor so it probably still needs some work
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]