[ https://issues.apache.org/jira/browse/FLINK-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16493607#comment-16493607 ]
ASF GitHub Bot commented on FLINK-9423: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6062#discussion_r191436243 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param <K> Type of the keys to which timers are scoped. + * @param <N> Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N> { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + private final long timestamp; + + private final K key; + + private final N namespace; + + /** + * This field holds the current physical index of this timer when it is managed by a timer heap so that we can + * support fast deletes. + */ + private transient int timerHeapIndex; --- End diff -- `TimerHeapInternalTimer` is non serializable. Thus, the `transient` keyword should not be needed. > Implement efficient deletes for heap based timer service > -------------------------------------------------------- > > Key: FLINK-9423 > URL: https://issues.apache.org/jira/browse/FLINK-9423 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.5.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Fix For: 1.6.0 > > > The current data structures in the `HeapInternalTimerService` are not able to > support efficient timer deletes, the complexity is currently O\(n\), where n > is the number of registered timers. > > We can keep track of timer's positions in the priority queue and (in > combination with the already existing set/map) have a more efficient > algorithm for deletes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)