This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2ccd0f  ensure hint window is persistent across restarts of a node
b2ccd0f is described below

commit b2ccd0f3f588a34cd68222bdacd1914478914ac9
Author: kurt <k...@instaclustr.com>
AuthorDate: Tue Feb 20 03:49:33 2018 +0000

    ensure hint window is persistent across restarts of a node
    
    patch by Kurt Greaves; reviewed by Brandon Williams, Mick Semb Wever and 
Stefan Miklosovic for CASSANDRA-14309
---
 CHANGES.txt                                        |  1 +
 NEWS.txt                                           | 10 +++++
 conf/cassandra.yaml                                | 14 +++++++
 doc/source/configuration/cass_yaml_file.rst        | 15 +++++++
 src/java/org/apache/cassandra/config/Config.java   |  1 +
 .../cassandra/config/DatabaseDescriptor.java       |  5 +++
 .../org/apache/cassandra/hints/HintsBuffer.java    | 26 ++++++++++++
 .../apache/cassandra/hints/HintsBufferPool.java    | 17 ++++++++
 .../org/apache/cassandra/hints/HintsService.java   | 17 +++++++-
 .../org/apache/cassandra/hints/HintsStore.java     |  8 ++++
 .../apache/cassandra/hints/HintsWriteExecutor.java | 28 +++++++++++--
 .../org/apache/cassandra/service/StorageProxy.java | 34 +++++++++++++--
 .../apache/cassandra/hints/HintsBufferTest.java    | 47 +++++++++++++++++++++
 .../apache/cassandra/hints/HintsServiceTest.java   | 48 ++++++++++++++++++++++
 14 files changed, 262 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 0103be2..3208ff8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Ensure hint window is persistent across restarts of a node (CASSANDRA-14309)
  * Allow to GRANT or REVOKE multiple permissions in a single statement 
(CASSANDRA-17030)
  * Allow to grant permission for all tables in a keyspace (CASSANDRA-17027)
  * Log time spent writing keys during compaction (CASSANDRA-17037)
diff --git a/NEWS.txt b/NEWS.txt
index 5c48685..7850b09 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -54,6 +54,16 @@ New features
       (track_warnings.local_read_size.warn_threshold_kb and 
track_warnings.local_read_size.abort_threshold_kb),
       and RowIndexEntry estimated memory size 
(track_warnings.row_index_size.warn_threshold_kb and
       track_warnings.row_index_size.abort_threshold_kb) are supported; more 
checks will be added over time.
+    - Prior to this version, the hint system was storing a window of hints as 
defined by
+      configuration property max_hint_window_in_ms, however this window is not 
persistent across restarts.
+      For example, if a node is restarted, it will be still eligible for a 
hint to be sent to it because it
+      was down less than max_hint_window_in_ms. Hence if that node continues 
restarting without hint delivery completing,
+      hints will be sent to that node indefinitely which would occupy more and 
more disk space.
+      This behaviour was changed in CASSANDRA-14309. From now on, by default, 
if a node is not down longer than
+      max_hint_window_in_ms, there is an additional check to see if there is a 
hint to be delivered which is older
+      than max_window_in_ms. If there is, a hint is not persisted. If there is 
not, it is.
+      This behaviour might be reverted as it was in previous version by 
property hint_window_persistent_enabled by
+      setting it to false. This property is by default set to true.
 
 Upgrading
 ---------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index a9efa8d..87df25a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -98,6 +98,20 @@ auto_hints_cleanup_enabled: false
 #     parameters:
 #         -
 
+# Enable / disable persistent hint windows.
+#
+# If set to false, a hint will be stored only in case a respective node
+# that hint is for is down less than or equal to max_hint_window_in_ms.
+#
+# If set to true, a hint will be stored in case there is not any
+# hint which was stored earlier than max_hint_window_in_ms. This is for cases
+# when a node keeps to restart and hints are not delivered yet, we would be 
saving
+# hints for that node indefinitely.
+#
+# Defaults to true.
+#
+# hint_window_persistent_enabled: true
+
 # Maximum throttle in KBs per second, total. This will be
 # reduced proportionally to the number of nodes in the cluster.
 batchlog_replay_throttle_in_kb: 1024
diff --git a/doc/source/configuration/cass_yaml_file.rst 
b/doc/source/configuration/cass_yaml_file.rst
index 8a14336..09b5cb4 100644
--- a/doc/source/configuration/cass_yaml_file.rst
+++ b/doc/source/configuration/cass_yaml_file.rst
@@ -150,6 +150,21 @@ are supported.
     #     parameters:
     #         -
 
+``hint_window_persistent_enabled``
+-------------------------
+
+*This option is commented out by default.*
+
+If set to false, a hint will be stored only in case a respective node
+that hint is for is down less than or equal to max_hint_window_in_ms.
+
+If set to true, a hint will be stored in case there is not any
+hint which was stored earlier than max_hint_window_in_ms. This is for cases
+when a node keeps restarting and hints are not delivered yet, we would be 
saving
+hints for that node indefinitely.
+
+*Default Value:* true
+
 ``batchlog_replay_throttle_in_kb``
 ----------------------------------
 Maximum throttle in KBs per second, total. This will be
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 3b574ac..b9b5975 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -74,6 +74,7 @@ public class Config
     public Set<String> hinted_handoff_disabled_datacenters = 
Sets.newConcurrentHashSet();
     public volatile int max_hint_window_in_ms = 3 * 3600 * 1000; // three hours
     public String hints_directory;
+    public boolean hint_window_persistent_enabled = true;
 
     public ParameterizedClass seed_provider;
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5195646..7a3164a 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2514,6 +2514,11 @@ public class DatabaseDescriptor
         return new File(conf.hints_directory);
     }
 
+    public static boolean hintWindowPersistentEnabled()
+    {
+        return conf.hint_window_persistent_enabled;
+    }
+
     public static File getSerializedCachePath(CacheType cacheType, String 
version, String extension)
     {
         String name = cacheType.toString()
diff --git a/src/java/org/apache/cassandra/hints/HintsBuffer.java 
b/src/java/org/apache/cassandra/hints/HintsBuffer.java
index d944b4d..e86ce26 100644
--- a/src/java/org/apache/cassandra/hints/HintsBuffer.java
+++ b/src/java/org/apache/cassandra/hints/HintsBuffer.java
@@ -26,11 +26,13 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.CRC32;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.AbstractIterator;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
@@ -58,6 +60,7 @@ final class HintsBuffer
 
     private final ConcurrentMap<UUID, Queue<Integer>> offsets;
     private final OpOrder appendOrder;
+    private final ConcurrentMap<UUID, Long> earliestHintByHost; // Stores time 
of the earliest hint in the buffer for each host
 
     private HintsBuffer(ByteBuffer slab)
     {
@@ -66,6 +69,7 @@ final class HintsBuffer
         position = new AtomicLong();
         offsets = new ConcurrentHashMap<>();
         appendOrder = new OpOrder();
+        earliestHintByHost = new ConcurrentHashMap<>();
     }
 
     static HintsBuffer create(int slabSize)
@@ -141,6 +145,21 @@ final class HintsBuffer
         };
     }
 
+    /**
+     * Retrieve the time of the earliest hint in the buffer for a specific node
+     * @param hostId UUID of the node
+     * @return timestamp for the earliest hint in the buffer, or {@link 
System#currentTimeMillis()}
+     */
+    long getEarliestHintTime(UUID hostId)
+    {
+        return earliestHintByHost.getOrDefault(hostId, 
Clock.Global.currentTimeMillis());
+    }
+
+    void clearEarliestHintForHostId(UUID hostId)
+    {
+        earliestHintByHost.remove(hostId);
+    }
+
     @SuppressWarnings("resource")
     Allocation allocate(int hintSize)
     {
@@ -222,8 +241,15 @@ final class HintsBuffer
         void write(Iterable<UUID> hostIds, Hint hint)
         {
             write(hint);
+            long ts = Clock.Global.currentTimeMillis();
             for (UUID hostId : hostIds)
+            {
+                // We only need the time of the first hint in the buffer
+                if (DatabaseDescriptor.hintWindowPersistentEnabled())
+                    earliestHintByHost.putIfAbsent(hostId, ts);
+
                 put(hostId, offset);
+            }
         }
 
         public void close()
diff --git a/src/java/org/apache/cassandra/hints/HintsBufferPool.java 
b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
index 8d1db8d..78f07dd 100644
--- a/src/java/org/apache/cassandra/hints/HintsBufferPool.java
+++ b/src/java/org/apache/cassandra/hints/HintsBufferPool.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.hints;
 
 import java.io.Closeable;
+import java.util.Iterator;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 
@@ -65,6 +66,22 @@ final class HintsBufferPool implements Closeable
         }
     }
 
+    /**
+     * Get the earliest hint for a specific node from all buffers
+     * @param hostId UUID of the node
+     * @return timestamp for the earliest hint
+     */
+    long getEarliestHintForHost(UUID hostId)
+    {
+        long min = currentBuffer().getEarliestHintTime(hostId);
+        Iterator<HintsBuffer> it = reserveBuffers.iterator();
+
+        while (it.hasNext())
+            min = Math.min(min, it.next().getEarliestHintTime(hostId));
+
+        return min;
+    }
+
     private HintsBuffer.Allocation allocate(int hintSize)
     {
         HintsBuffer current = currentBuffer();
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java 
b/src/java/org/apache/cassandra/hints/HintsService.java
index 3fcc00e..f3957a1 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,7 +194,7 @@ public final class HintsService implements HintsServiceMBean
         // judicious use of streams: eagerly materializing probably cheaper
         // than performing filters / translations 2x extra via 
Iterables.filter/transform
         List<UUID> hostIds = replicas.stream()
-                .filter(StorageProxy::shouldHint)
+                .filter(replica -> StorageProxy.shouldHint(replica, false))
                 .map(replica -> 
StorageService.instance.getHostIdForEndpoint(replica.endpoint()))
                 .collect(Collectors.toList());
 
@@ -428,6 +429,20 @@ public final class HintsService implements 
HintsServiceMBean
         return dispatchExecutor.transfer(catalog, hostIdSupplier);
     }
 
+    /**
+     * Get the earliest hint written for a particular node,
+     * @param hostId UUID of the node to check it's hints.
+     * @return earliest hint as per unix time or Long.MIN_VALUE if hostID is 
null
+     */
+    public long getEarliestHintForHost(UUID hostId)
+    {
+        // Need to check only the first descriptor + all buffers.
+        HintsStore store = catalog.get(hostId);
+        HintsDescriptor desc = store.getFirstDescriptor();
+        long timestamp = desc == null ? Clock.Global.currentTimeMillis() : 
desc.timestamp;
+        return Math.min(timestamp, bufferPool.getEarliestHintForHost(hostId));
+    }
+
     HintsCatalog getCatalog()
     {
         return catalog;
diff --git a/src/java/org/apache/cassandra/hints/HintsStore.java 
b/src/java/org/apache/cassandra/hints/HintsStore.java
index 29e79bf..bc2fee7 100644
--- a/src/java/org/apache/cassandra/hints/HintsStore.java
+++ b/src/java/org/apache/cassandra/hints/HintsStore.java
@@ -247,6 +247,14 @@ final class HintsStore
         corruptedFiles.add(descriptor);
     }
 
+    /**
+     * @return a copy of the first {@link HintsDescriptor} in the queue for 
dispatch or {@code null} if queue is empty.
+     */
+    HintsDescriptor getFirstDescriptor()
+    {
+        return dispatchDequeue.peekFirst();
+    }
+
     /*
      * Methods dealing with HintsWriter.
      *
diff --git a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java 
b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
index c4bfff0..8089d61 100644
--- a/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
+++ b/src/java/org/apache/cassandra/hints/HintsWriteExecutor.java
@@ -194,7 +194,7 @@ final class HintsWriteExecutor
         {
             HintsBuffer buffer = bufferPool.currentBuffer();
             buffer.waitForModifications();
-            stores.forEach(store -> 
flush(buffer.consumingHintsIterator(store.hostId), store));
+            stores.forEach(store -> 
flush(buffer.consumingHintsIterator(store.hostId), store, buffer));
         }
     }
 
@@ -216,10 +216,10 @@ final class HintsWriteExecutor
 
     private void flush(HintsBuffer buffer)
     {
-        buffer.hostIds().forEach(hostId -> 
flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId)));
+        buffer.hostIds().forEach(hostId -> 
flush(buffer.consumingHintsIterator(hostId), catalog.get(hostId), buffer));
     }
 
-    private void flush(Iterator<ByteBuffer> iterator, HintsStore store)
+    private void flush(Iterator<ByteBuffer> iterator, HintsStore store, 
HintsBuffer buffer)
     {
         while (true)
         {
@@ -231,7 +231,27 @@ final class HintsWriteExecutor
 
             // exceeded the size limit for an individual file, but still have 
more to write
             // close the current writer and continue flushing to a new one in 
the next iteration
-            store.closeWriter();
+            try
+            {
+                store.closeWriter();
+            }
+            finally
+            {
+                /*
+                We remove the earliest hint for a respective hostId of the 
store from the buffer,
+                we are removing it specifically after we closed the store 
above in try block
+                so hints are persisted on disk before.
+
+                There is a periodic flushing of a buffer driven by 
hints_flush_period_in_ms and clearing
+                this entry upon every flush would remove the information what 
is the earliest hint in the buffer
+                for a respective node prematurely.
+
+                Since this flushing method is called for every host id a 
buffer holds, we will eventually
+                remove all hostIds of the earliest hints of the buffer, and it 
will be added again as soon as there
+                is a new hint for that node to be delivered.
+                */
+                buffer.clearEarliestHintForHostId(store.hostId);
+            }
         }
     }
 
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index c02a77d..c8495dc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,6 +44,8 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
 import org.slf4j.Logger;
@@ -2192,6 +2194,11 @@ public class StorageProxy implements StorageProxyMBean
 
     public static boolean shouldHint(Replica replica)
     {
+        return shouldHint(replica, true);
+    }
+
+    public static boolean shouldHint(Replica replica, boolean 
tryEnablePersistentWindow)
+    {
         if (!DatabaseDescriptor.hintedHandoffEnabled())
             return false;
         if (replica.isTransient() || replica.isSelf())
@@ -2207,12 +2214,31 @@ public class StorageProxy implements StorageProxyMBean
                 return false;
             }
         }
-        boolean hintWindowExpired = 
Gossiper.instance.getEndpointDowntime(replica.endpoint()) > 
DatabaseDescriptor.getMaxHintWindow();
-        if (hintWindowExpired)
+
+        InetAddressAndPort endpoint = replica.endpoint();
+        int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
+        long endpointDowntime = 
Gossiper.instance.getEndpointDowntime(endpoint);
+        boolean hintWindowExpired = endpointDowntime > maxHintWindow;
+
+        if (tryEnablePersistentWindow && !hintWindowExpired && 
DatabaseDescriptor.hintWindowPersistentEnabled())
         {
-            HintsService.instance.metrics.incrPastWindow(replica.endpoint());
-            Tracing.trace("Not hinting {} which has been down {} ms", replica, 
Gossiper.instance.getEndpointDowntime(replica.endpoint()));
+            UUID hostIdForEndpoint = 
StorageService.instance.getHostIdForEndpoint(endpoint);
+            if (hostIdForEndpoint != null)
+            {
+                long earliestHint = 
HintsService.instance.getEarliestHintForHost(hostIdForEndpoint);
+                hintWindowExpired = Clock.Global.currentTimeMillis() - 
maxHintWindow > earliestHint;
+                if (hintWindowExpired)
+                    Tracing.trace("Not hinting {} for which there is the 
earliest hint stored at {}", replica, earliestHint);
+            }
+        }
+        else if (hintWindowExpired)
+        {
+            Tracing.trace("Not hinting {} which has been down {} ms", replica, 
endpointDowntime);
         }
+
+        if (hintWindowExpired)
+            HintsService.instance.metrics.incrPastWindow(replica.endpoint());
+
         return !hintWindowExpired;
     }
 
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java 
b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index 53c1c77..3020fb9 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -26,6 +26,7 @@ import java.util.zip.CRC32;
 import com.google.common.collect.Iterables;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
@@ -40,12 +41,16 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.Clock;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static junit.framework.Assert.*;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
 
+@RunWith(BMUnitRunner.class)
 public class HintsBufferTest
 {
     private static final String KEYSPACE = "hints_buffer_test";
@@ -157,6 +162,48 @@ public class HintsBufferTest
         buffer.free();
     }
 
+    static volatile long timestampForHint = 0;
+    // BM rule to get the timestamp that was used to store the hint so that we 
avoid any flakiness in timestamps between
+    // when we send the hint and when it actually got written.
+    @Test
+    @BMRule(name = "GetHintTS",
+            targetClass="HintsBuffer$Allocation",
+            targetMethod="write(Iterable, Hint)",
+            targetLocation="AFTER INVOKE putIfAbsent",
+            
action="org.apache.cassandra.hints.HintsBufferTest.timestampForHint = $ts")
+    public void testEarliestHintTime()
+    {
+        int hintSize = (int) Hint.serializer.serializedSize(createHint(0, 
Clock.Global.currentTimeMillis()), MessagingService.current_version);
+        int entrySize = hintSize + HintsBuffer.ENTRY_OVERHEAD_SIZE;
+        // allocate a slab to fit 10 hints
+        int slabSize = entrySize * 10;
+
+        // use a fixed timestamp base for all mutation timestamps
+        long baseTimestamp = Clock.Global.currentTimeMillis();
+
+        HintsBuffer buffer = HintsBuffer.create(slabSize);
+        UUID uuid = UUID.randomUUID();
+        // Track the first hints time
+        try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize))
+        {
+            Hint hint = createHint(100, baseTimestamp);
+            allocation.write(Collections.singleton(uuid), hint);
+        }
+        long oldestHintTime = timestampForHint;
+
+        // Write some more hints to ensure we actually test getting the 
earliest
+        for (int i = 0; i < 9; i++)
+        {
+            try (HintsBuffer.Allocation allocation = buffer.allocate(hintSize))
+            {
+                Hint hint = createHint(i, baseTimestamp);
+                allocation.write(Collections.singleton(uuid), hint);
+            }
+        }
+        long earliest = buffer.getEarliestHintTime(uuid);
+        assertEquals(oldestHintTime, earliest);
+    }
+
     private static int validateEntry(UUID hostId, ByteBuffer buffer, long 
baseTimestamp, UUID[] load) throws IOException
     {
         CRC32 crc = new CRC32();
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java 
b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index dd0eb5a..10a7040 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.hints;
 
+import java.util.Collections;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -29,9 +31,13 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import com.datastax.driver.core.utils.MoreFutures;
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MockMessagingService;
@@ -40,12 +46,16 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static org.apache.cassandra.hints.HintsTestUtil.MockFailureDetector;
 import static org.apache.cassandra.hints.HintsTestUtil.sendHintsAndResponses;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
+@RunWith(BMUnitRunner.class)
 public class HintsServiceTest
 {
     private static final String KEYSPACE = "hints_service_test";
@@ -173,4 +183,42 @@ public class HintsServiceTest
         assertTrue(dispatchOffset != null);
         assertTrue(((ChecksummedDataInput.Position) 
dispatchOffset).sourcePosition > 0);
     }
+
+    // BM rule to get the timestamp that was used to store the hint so that we 
avoid any flakiness in timestamps between
+    // when we send the hint and when it actually got written.
+    static volatile long timestampForHint = 0L;
+    @Test
+    @BMRule(name = "GetHintTS",
+    targetClass="HintsBuffer$Allocation",
+    targetMethod="write(Iterable, Hint)",
+    targetLocation="AFTER INVOKE putIfAbsent",
+    action="org.apache.cassandra.hints.HintsServiceTest.timestampForHint = 
$ts")
+    public void testEarliestHint() throws InterruptedException
+    {
+        // create and write noOfHints using service
+        UUID hostId = StorageService.instance.getLocalHostUUID();
+        TableMetadata metadata = Schema.instance.getTableMetadata(KEYSPACE, 
TABLE);
+
+        long ts = System.currentTimeMillis();
+        DecoratedKey dkey = Util.dk(String.valueOf(1));
+        PartitionUpdate.SimpleBuilder builder = 
PartitionUpdate.simpleBuilder(metadata, dkey).timestamp(ts);
+        builder.row("column0").add("val", "value0");
+        Hint hint = Hint.create(builder.buildAsMutation(), ts);
+        HintsService.instance.write(hostId, hint);
+        long oldestHintTime = timestampForHint;
+        Thread.sleep(1);
+        HintsService.instance.write(hostId, hint);
+        Thread.sleep(1);
+        HintsService.instance.write(hostId, hint);
+
+        // Close and fsync so that we get the timestamp from the descriptor 
rather than the buffer.
+        HintsStore store = HintsService.instance.getCatalog().get(hostId);
+        
HintsService.instance.flushAndFsyncBlockingly(Collections.singletonList(hostId));
+        store.closeWriter();
+
+        long earliest = HintsService.instance.getEarliestHintForHost(hostId);
+        assertEquals(oldestHintTime, earliest);
+        assertNotEquals(oldestHintTime, timestampForHint);
+    }
+
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to