Github user kishoreg commented on a diff in the pull request: https://github.com/apache/helix/pull/159#discussion_r177260018 --- Diff: helix-core/src/main/java/org/apache/helix/common/DedupEventBlockingQueue.java --- @@ -0,0 +1,139 @@ +package org.apache.helix.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; + +/** + * A blocking queue of events, which automatically deduplicate events with the same "type" within + * the queue, i.e, when putting an event into the queue, if there is already an event with the + * same type existing in the queue, the new event won't be inserted into the queue. + * This class is meant to be a limited implementation of the {@link BlockingQueue} interface. + * + * T -- the Type of an event. + * E -- the event itself. + */ +public class DedupEventBlockingQueue<T, E> { + private final Map<T, Entry<T, E>> _eventMap; + private final Queue<Entry> _eventQueue; + + class Entry <T, E> { + private T _type; + private E _event; + + Entry (T type, E event) { + _type = type; + _event = event; + } + + T getType() { + return _type; + } + + E getEvent() { + return _event; + } + } + + /** + * Instantiate the queue + */ + public DedupEventBlockingQueue() { + _eventMap = Maps.newHashMap(); + _eventQueue = Lists.newLinkedList(); + } + + /** + * Remove all events from the queue + */ + public synchronized void clear() { + _eventMap.clear(); + _eventQueue.clear(); + } + + /** + * Add a single event to the queue, overwriting events with the same name + */ + public synchronized void put(T type, E event) { + Entry entry = new Entry(type, event); + + if (!_eventMap.containsKey(entry.getType())) { + // only insert to the queue if there isn't a same-typed event already present + boolean result = _eventQueue.offer(entry); + if (!result) { + return; + } + } + // always overwrite the existing entry in the map in case the entry is different + _eventMap.put((T) entry.getType(), entry); + notify(); + } + + /** + * Remove an element from the front of the queue, blocking if none is available. This method + * will return the most recent event seen with the oldest enqueued event name. + * @return ClusterEvent at the front of the queue + * @throws InterruptedException if the wait for elements was interrupted + */ + public synchronized E take() throws InterruptedException { + while (_eventQueue.isEmpty()) { + wait(); --- End diff -- Could we use blocking q and piggy bank on that impl?
---