FLUME-1534. CheckpointRebuilder$ComparableFlumeEventPointer#equal does
not work correctly.

(Hari Shreedharan via Will McQueen)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/26c08938
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/26c08938
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/26c08938

Branch: refs/heads/cdh-1.2.0+24_intuit
Commit: 26c08938776bf7661cb2386b3594037060d71f2d
Parents: 61ebdb2
Author: Will McQueen <[email protected]>
Authored: Sun Sep 2 15:30:21 2012 -0700
Committer: Mike Percy <[email protected]>
Committed: Fri Sep 7 14:03:06 2012 -0700

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java    |   19 +++++++++++++-
 1 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/26c08938/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 4db1b9c..32b5324 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
@@ -146,9 +147,12 @@ public class CheckpointRebuilder {
     }
     Set<ComparableFlumeEventPointer> sortedPuts =
             Sets.newTreeSet(committedPuts);
+    int count = 0;
     for (ComparableFlumeEventPointer put : sortedPuts) {
       queue.addTail(put.pointer);
+      count++;
     }
+    LOG.info("Replayed {} events using fast replay logic.", count);
     return true;
   }
 
@@ -178,13 +182,15 @@ public class CheckpointRebuilder {
     }
   }
 
-  private class ComparableFlumeEventPointer
+  private final class ComparableFlumeEventPointer
           implements Comparable<ComparableFlumeEventPointer> {
 
     private final FlumeEventPointer pointer;
     private final long orderID;
 
     public ComparableFlumeEventPointer(FlumeEventPointer pointer, long 
orderID){
+      Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be"
+              + "null while creating a ComparableFlumeEventPointer");
       this.pointer = pointer;
       this.orderID = orderID;
     }
@@ -206,7 +212,16 @@ public class CheckpointRebuilder {
 
     @Override
     public boolean equals(Object o){
-      return pointer.equals(o);
+      if(this == o){
+        return true;
+      }
+      if(o == null){
+        return false;
+      }
+      if(o.getClass() != this.getClass()){
+        return false;
+      }
+      return pointer.equals(((ComparableFlumeEventPointer)o).pointer);
     }
   }
 

Reply via email to