Updated Branches:
  refs/heads/flume-1.3.0 a4763a7d7 -> 6b8c8d70b

FLUME-1534. CheckpointRebuilder$ComparableFlumeEventPointer#equal does
not work correctly.

(Hari Shreedharan via Will McQueen)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6b8c8d70
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6b8c8d70
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6b8c8d70

Branch: refs/heads/flume-1.3.0
Commit: 6b8c8d70b773535e9815b6323e9b0bcbdf82ccd1
Parents: a4763a7
Author: Will McQueen <[email protected]>
Authored: Sun Sep 2 15:30:21 2012 -0700
Committer: Will McQueen <[email protected]>
Committed: Sun Sep 2 15:47:13 2012 -0700

----------------------------------------------------------------------
 .../flume/channel/file/CheckpointRebuilder.java    |   19 +++++++++++++-
 1 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/6b8c8d70/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index 4db1b9c..32b5324 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.SetMultimap;
@@ -146,9 +147,12 @@ public class CheckpointRebuilder {
     }
     Set<ComparableFlumeEventPointer> sortedPuts =
             Sets.newTreeSet(committedPuts);
+    int count = 0;
     for (ComparableFlumeEventPointer put : sortedPuts) {
       queue.addTail(put.pointer);
+      count++;
     }
+    LOG.info("Replayed {} events using fast replay logic.", count);
     return true;
   }
 
@@ -178,13 +182,15 @@ public class CheckpointRebuilder {
     }
   }
 
-  private class ComparableFlumeEventPointer
+  private final class ComparableFlumeEventPointer
           implements Comparable<ComparableFlumeEventPointer> {
 
     private final FlumeEventPointer pointer;
     private final long orderID;
 
     public ComparableFlumeEventPointer(FlumeEventPointer pointer, long 
orderID){
+      Preconditions.checkNotNull(pointer, "FlumeEventPointer cannot be"
+              + "null while creating a ComparableFlumeEventPointer");
       this.pointer = pointer;
       this.orderID = orderID;
     }
@@ -206,7 +212,16 @@ public class CheckpointRebuilder {
 
     @Override
     public boolean equals(Object o){
-      return pointer.equals(o);
+      if(this == o){
+        return true;
+      }
+      if(o == null){
+        return false;
+      }
+      if(o.getClass() != this.getClass()){
+        return false;
+      }
+      return pointer.equals(((ComparableFlumeEventPointer)o).pointer);
     }
   }
 

Reply via email to