pvary commented on code in PR #10331:
URL: https://github.com/apache/iceberg/pull/10331#discussion_r1599571779
##########
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/AggregatedStatisticsTracker.java:
##########
@@ -104,30 +144,135 @@ AggregatedStatistics<D, S> updateAndCheckCompletion(
subtask,
checkpointId);
} else {
- inProgressStatistics.mergeDataStatistic(
+ merge(dataStatistics);
+ LOG.debug(
+ "Merge data statistics from operator {} subtask {} for checkpoint
{}.",
operatorName,
- event.checkpointId(),
- DataStatisticsUtil.deserializeDataStatistics(
- event.statisticsBytes(), statisticsSerializer));
+ subtask,
+ checkpointId);
}
+ // This should be the happy path where all subtasks reports are received
if (inProgressSubtaskSet.size() == parallelism) {
- completedStatistics = inProgressStatistics;
+ completedStatistics = completedStatistics();
+ resetAggregates();
LOG.info(
- "Received data statistics from all {} operators {} for checkpoint
{}. Return last completed aggregator {}.",
+ "Received data statistics from all {} operators {} for checkpoint
{}. Return last completed aggregator.",
parallelism,
operatorName,
- inProgressStatistics.checkpointId(),
- completedStatistics.dataStatistics());
- inProgressStatistics = new AggregatedStatistics<>(checkpointId + 1,
statisticsSerializer);
- inProgressSubtaskSet.clear();
+ inProgressCheckpointId);
}
return completedStatistics;
}
+ private boolean inProgress() {
+ return inProgressCheckpointId != CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+ }
+
+ private AggregatedStatistics completedStatistics() {
+ if (coordinatorStatisticsType == StatisticsType.Map) {
+ LOG.info(
+ "Completed map statistics aggregation with {} keys",
coordinatorMapStatistics.size());
+ return AggregatedStatistics.fromKeyFrequency(
+ inProgressCheckpointId, coordinatorMapStatistics);
+ } else {
+ ReservoirItemsSketch<SortKey> sketch =
coordinatorSketchStatistics.getResult();
+ LOG.info(
+ "Completed sketch statistics aggregation: "
+ + "reservoir size = {}, number of items seen = {}, number of
samples = {}",
+ sketch.getK(),
+ sketch.getN(),
+ sketch.getNumSamples());
+ return AggregatedStatistics.fromRangeBounds(
+ inProgressCheckpointId,
+ SketchUtil.rangeBounds(downstreamParallelism, comparator, sketch));
+ }
+ }
+
+ private void initializeAggregates(long checkpointId, DataStatistics
taskStatistics) {
+ LOG.info("Starting a new statistics aggregation for checkpoint {}",
checkpointId);
+ this.inProgressCheckpointId = checkpointId;
+ this.coordinatorStatisticsType = taskStatistics.type();
+
+ if (coordinatorStatisticsType == StatisticsType.Map) {
+ this.coordinatorMapStatistics = Maps.newHashMap();
+ this.coordinatorSketchStatistics = null;
+ } else {
+ this.coordinatorMapStatistics = null;
+ this.coordinatorSketchStatistics =
+ ReservoirItemsUnion.newInstance(
+
SketchUtil.determineCoordinatorReservoirSize(downstreamParallelism));
+ }
+ }
+
+ private void resetAggregates() {
+ inProgressSubtaskSet.clear();
+ this.inProgressCheckpointId = CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
+ this.coordinatorMapStatistics = null;
+ this.coordinatorSketchStatistics = null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void merge(DataStatistics taskStatistics) {
+ if (taskStatistics.type() == StatisticsType.Map) {
+ Map<SortKey, Long> taskMapStats = (Map<SortKey, Long>)
taskStatistics.result();
+ if (coordinatorStatisticsType == StatisticsType.Map) {
+ taskMapStats.forEach((key, count) ->
coordinatorMapStatistics.merge(key, count, Long::sum));
+ if (coordinatorMapStatistics.size() > switchToSketchThreshold) {
+ convertCoordinatorToSketch();
+ }
+ } else {
+ // convert task stats to sketch first
+ ReservoirItemsSketch<SortKey> taskSketch =
+ ReservoirItemsSketch.newInstance(
+ SketchUtil.determineOperatorReservoirSize(parallelism,
downstreamParallelism));
+ SketchUtil.convertMapToSketch(taskMapStats, taskSketch::update);
+ coordinatorSketchStatistics.update(taskSketch);
Review Comment:
I'm wondering which is better:
1. Getting a map from task -> converting task map to sketch -> merging the
coordinator and the map sketch
2. Updating the coordinator sketch, by adding the values from the map
directly
Which one is performing better? Which results in better approximation in the
resulting sketch?
If we consciously use the 1st solution, then we probably want to send a
message to the tasks when we switch to sketch to not bother sending the whole
map, but just the sketch (it might be a smaller message)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]