Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r190517094 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java --- @@ -458,24 +458,33 @@ private int globalKeyGroupToLocalIndex(int keyGroup) { return keyGroup - keyGroupRange.getStartKeyGroup(); } - private void checkCapacity(int requested) { + private void growIfRequired(int requiredSize) { int oldArraySize = queue.length; - if (requested >= oldArraySize) { + if (requiredSize >= oldArraySize) { final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1; - int newArraySize = oldArraySize + grow; - if (newArraySize - MAX_ARRAY_SIZE > 0) { - if (newArraySize < 0 || requested > MAX_ARRAY_SIZE) { - throw new OutOfMemoryError("Required timer heap exceeds maximum size!"); - } else { - newArraySize = MAX_ARRAY_SIZE; - } - } - queue = Arrays.copyOf(queue, newArraySize); + resizeQueueArray(oldArraySize + grow); } // TODO implement shrinking as well? } + private void resizeForBulkLoad(int maxTotalSize) { + if (maxTotalSize > queue.length) { + resizeQueueArray(maxTotalSize + (maxTotalSize / 8)); --- End diff -- `maxTotalSize / 8` -> `maxTotalSize >>> 3`
---