stevenzwu commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r693618065
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -369,6 +373,27 @@ private String operatorName(String suffix) {
default:
throw new RuntimeException("Unrecognized write.distribution-mode: "
+ writeMode);
}
+
+ if (keySelector != null) {
+ return input.keyBy(keySelector);
+ }
+ return input;
+ }
+
+ private KeySelector<RowData, String> getKeySelector(List<Integer>
equalityFieldIds, PartitionSpec partitionSpec,
+ Schema schema, RowType rowType) {
+ boolean hasPrimaryKey = equalityFieldIds != null &&
!equalityFieldIds.isEmpty();
+ boolean hasPartitionKey = partitionSpec != null &&
!partitionSpec.isUnpartitioned();
+
+ if (hasPrimaryKey && hasPartitionKey) {
+ return new CombinedKeySelector(partitionSpec, equalityFieldIds,
schema, rowType);
Review comment:
this defeats the purpose of hash distribution, which is make sure one
writer task processed data from one partition. This `CombinedKeySelector` will
nullify the goal. It seems to me that hash shuffle and equality key shuffle
shouldn't be enabled at the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]