FLUME-1534. CheckpointRebuilder$ComparableFlumeEventPointer#equal does not work correctly.
(Hari Shreedharan via Will McQueen) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/26c08938 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/26c08938 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/26c08938 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: 26c08938776bf7661cb2386b3594037060d71f2d Parents: 61ebdb2 Author: Will McQueen <[email protected]> Authored: Sun Sep 2 15:30:21 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:06 2012 -0700 ---------------------------------------------------------------------- .../flume/channel/file/CheckpointRebuilder.java | 19 +++++++++++++- 1 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/26c08938/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java index 4db1b9c..32b5324 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.SetMultimap; @@ -146,9 +147,12 @@ public class CheckpointRebuilder { } Set<ComparableFlumeEventPointer> sortedPuts = Sets.newTreeSet(committedPuts); + int count = 0; for (ComparableFlumeEventPointer put : sortedPuts) { queue.addTail(put.pointer); + count++; } + LOG.info("Replayed {} events using fast replay logic.", count); return true; } @@ -178,13 +182,15 @@ public class CheckpointRebuilder { } } - private class ComparableFlumeEventPointer + private final class ComparableFlumeEventPointer implements Comparable<ComparableFlumeEventPointer> { private final FlumeEventPointer pointer; private final long orderID; public ComparableFlumeEventPointer(FlumeEventPointer pointer, long orderID){ + Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be" + + "null while creating a ComparableFlumeEventPointer"); this.pointer = pointer; this.orderID = orderID; } @@ -206,7 +212,16 @@ public class CheckpointRebuilder { @Override public boolean equals(Object o){ - return pointer.equals(o); + if(this == o){ + return true; + } + if(o == null){ + return false; + } + if(o.getClass() != this.getClass()){ + return false; + } + return pointer.equals(((ComparableFlumeEventPointer)o).pointer); } }
