krummas commented on code in PR #4556:
URL: https://github.com/apache/cassandra/pull/4556#discussion_r2763083713


##########
src/java/org/apache/cassandra/config/DatabaseDescriptor.java:
##########
@@ -1309,6 +1313,12 @@ private static void validateReadThresholds(String name, 
DataStorageSpec.LongByte
                                                            name + 
"_fail_threshold", fail,
                                                            name + 
"_warn_threshold", warn));
     }
+    
+    private static void validateWriteThreshold(String name, int value)
+    {
+        if (value < -1)

Review Comment:
   we should validate that the write thresholds are larger than
   `min_tracked_partition_size` and `min_tracked_partition_tombstone_count` 
otherwise we might miss some partitions we'd want to warn for.
   
   We should also make sure that `top_partitions_enabled` is true if 
`write_thresholds_enabled` (or at least log a warning otherwise)



##########
src/java/org/apache/cassandra/db/WriteThresholds.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Utility class for checking write threshold warnings on replicas.
+ * CASSANDRA-17258: paxos and accord do complex thread hand off and custom 
write logic which makes this patch complex, so was deferred
+ */
+public class WriteThresholds
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteThresholds.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    /**
+     * Check write thresholds for all partition updates in a mutation.
+     * This method iterates through all partition updates in the mutation.
+     *
+     * @param mutation the mutation containing one or more partition updates
+     */
+    public static void checkWriteThresholds(Mutation mutation)
+    {
+        if (!DatabaseDescriptor.isDaemonInitialized() || 
!DatabaseDescriptor.getWriteThresholdsEnabled())
+            return;
+
+        DataStorageSpec.LongBytesBound sizeWarnThreshold = 
DatabaseDescriptor.getWriteSizeWarnThreshold();
+        int tombstoneWarnThreshold = 
DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+
+        if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
+            return;
+
+        long sizeWarnBytes = sizeWarnThreshold != null ? 
sizeWarnThreshold.toBytes() : -1;
+
+        for (PartitionUpdate update : mutation.getPartitionUpdates())
+        {
+            checkWriteThresholdsInternal(update, update.partitionKey(), 
sizeWarnBytes, tombstoneWarnThreshold);
+        }
+    }
+
+    /**
+     * Internal method to check write thresholds for a single partition update.
+     * This method looks up the partition in TopPartitionTracker and adds
+     * warning params to MessageParams if thresholds are exceeded.
+     *
+     * @param update                 the partition update being written
+     * @param key                    the partition key being written
+     * @param sizeWarnBytes          size threshold in bytes, or -1 if disabled
+     * @param tombstoneWarnThreshold tombstone count threshold, or -1 if 
disabled
+     */
+    private static void checkWriteThresholdsInternal(PartitionUpdate update, 
DecoratedKey key,
+                                                     long sizeWarnBytes, int 
tombstoneWarnThreshold)
+    {
+        TableId tableId = update.metadata().id;
+        ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(tableId);
+
+        if (cfs == null || cfs.topPartitions == null)
+            return;
+
+        long estimatedSize = cfs.topPartitions.topSizes().getEstimate(key);
+        long estimatedTombstones = 
cfs.topPartitions.topTombstones().getEstimate(key);
+
+        TableMetadata meta = update.metadata();
+
+        if (sizeWarnBytes != -1 && estimatedSize > sizeWarnBytes)
+        {
+            Number currentValue = MessageParams.get(ParamType.WRITE_SIZE_WARN);
+            long currentLong = currentValue != null ? currentValue.longValue() 
: -1;
+
+            if (currentLong < estimatedSize)
+            {
+                MessageParams.add(ParamType.WRITE_SIZE_WARN, estimatedSize);
+                noSpamLogger.warn("Write to {} partition {} triggered size 
warning; " +
+                                  "estimated size is {} bytes, threshold is {} 
bytes (see write_size_warn_threshold)",

Review Comment:
   we should log the keyspace and table here



##########
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
+import org.apache.cassandra.utils.Pair;
+
+public class CoordinatorWriteWarnings
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorWriteWarnings.class);
+
+    private static final Warnings INIT = new Warnings();
+    private static final Warnings EMPTY = new Warnings();
+
+    private static final CoordinatorWarningsState<Warnings> STATE =
+    new CoordinatorWarningsState<>("CoordinatorWriteWarnings",
+                                   INIT,
+                                   EMPTY,
+                                   Warnings::new,
+                                   () -> EMPTY,
+                                   logger,
+                                   false);
+
+    /**
+     * Initialize coordinator write warnings for this thread. Must be called 
at the start of a client request.
+     */
+    public static void init()
+    {
+        STATE.init();
+    }
+
+    /**
+     * Update warnings for a partition after receiving responses from replicas.
+     *
+     * @param mutation the mutation that was written
+     * @param snapshot the aggregated warnings from replicas
+     */
+    public static void update(IMutation mutation, WriteWarningsSnapshot 
snapshot)
+    {
+        if (snapshot.isEmpty())
+        {

Review Comment:
   nit; no need for brackets when there is only a single statement



##########
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
+import org.apache.cassandra.utils.Pair;
+
+public class CoordinatorWriteWarnings
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorWriteWarnings.class);
+
+    private static final Warnings INIT = new Warnings();
+    private static final Warnings EMPTY = new Warnings();
+
+    private static final CoordinatorWarningsState<Warnings> STATE =
+    new CoordinatorWarningsState<>("CoordinatorWriteWarnings",
+                                   INIT,
+                                   EMPTY,
+                                   Warnings::new,
+                                   () -> EMPTY,
+                                   logger,
+                                   false);
+
+    /**
+     * Initialize coordinator write warnings for this thread. Must be called 
at the start of a client request.
+     */
+    public static void init()
+    {
+        STATE.init();
+    }
+
+    /**
+     * Update warnings for a partition after receiving responses from replicas.
+     *
+     * @param mutation the mutation that was written
+     * @param snapshot the aggregated warnings from replicas
+     */
+    public static void update(IMutation mutation, WriteWarningsSnapshot 
snapshot)
+    {
+        if (snapshot.isEmpty())
+        {
+            return;
+        }
+
+        Warnings warnings = STATE.mutable();
+        if (warnings == EMPTY)
+        {
+            return;
+        }
+
+        for (PartitionUpdate update : mutation.getPartitionUpdates())
+        {
+            warnings.merge(update.metadata().id, update.partitionKey(), 
snapshot);
+        }
+    }
+
+    /**
+     * Process accumulated warnings: send to client and update metrics.
+     * Must be called at the end of a client request.
+     */
+    public static void done()
+    {
+        STATE.processAndReset(CoordinatorWriteWarnings::processWarnings);
+    }
+
+    /**
+     * Reset/clear warnings for this thread.
+     */
+    public static void reset()
+    {
+        STATE.reset();
+    }
+
+    private static void processWarnings(Warnings warnings)
+    {
+        if (warnings.partitions == null || warnings.partitions.isEmpty())
+            return;
+
+        for (Map.Entry<Pair<TableId, DecoratedKey>, WriteWarningsSnapshot> 
entry : warnings.partitions.entrySet())
+        {
+            Pair<TableId, DecoratedKey> key = entry.getKey();
+            WriteWarningsSnapshot snapshot = entry.getValue();
+
+            ColumnFamilyStore cfs = 
Schema.instance.getColumnFamilyStoreInstance(key.left);
+            if (cfs == null)
+            {
+                logger.warn("ColumnFamilyStore is null for table {}, 
skipping", key.left);
+                continue;
+            }
+
+            TableMetadata metadata = cfs.metadata();
+            String partitionKey = 
metadata.partitionKeyType.toCQLString(key.right.getKey());
+
+            recordWarnings(partitionKey, cfs, snapshot);
+        }
+    }
+
+    private static void recordWarnings(String partitionKey, ColumnFamilyStore 
cfs, WriteWarningsSnapshot snapshot)
+    {
+        TableMetadata metadata = cfs.metadata();
+        if (!snapshot.writeSize.instances.isEmpty())
+        {
+            String msg = String.format("Write to %s.%s partition %s: %s",
+                                       metadata.keyspace,
+                                       metadata.name,
+                                       partitionKey,
+                                       
WriteWarningsSnapshot.writeSizeWarnMessage(
+                                       snapshot.writeSize.instances.size(),
+                                       snapshot.writeSize.maxValue));
+            ClientWarn.instance.warn(msg);
+            logger.warn(msg);
+            cfs.metric.writeSizeWarnings.mark();
+            cfs.metric.writeSize.update(snapshot.writeSize.maxValue);
+        }
+
+        if (!snapshot.writeTombstone.instances.isEmpty())
+        {
+            String msg = String.format("Write to %s.%s partition %s: %s",
+                                       metadata.keyspace,
+                                       metadata.name,
+                                       partitionKey,
+                                       
WriteWarningsSnapshot.writeTombstoneWarnMessage(
+                                       
snapshot.writeTombstone.instances.size(),
+                                       snapshot.writeTombstone.maxValue));
+            ClientWarn.instance.warn(msg);
+            logger.warn(msg);
+            cfs.metric.writeTombstoneWarnings.mark();
+        }
+    }
+
+    /**
+     * Internal state holder for accumulated warnings.
+     */
+    private static class Warnings
+    {
+        @Nullable
+        Map<Pair<TableId, DecoratedKey>, WriteWarningsSnapshot> partitions;

Review Comment:
   We don't need this `Pair` - a `Mutation` is for a single decorated key, each 
PartitionUpdate in the mutation will have the same key - the map can be 
`Map<TableId, WriteWarningsSnapshot>`



##########
src/java/org/apache/cassandra/service/writes/thresholds/CoordinatorWriteWarnings.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.thresholds.CoordinatorWarningsState;
+import org.apache.cassandra.utils.Pair;
+
+public class CoordinatorWriteWarnings
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CoordinatorWriteWarnings.class);
+
+    private static final Warnings INIT = new Warnings();
+    private static final Warnings EMPTY = new Warnings();
+
+    private static final CoordinatorWarningsState<Warnings> STATE =
+    new CoordinatorWarningsState<>("CoordinatorWriteWarnings",
+                                   INIT,
+                                   EMPTY,
+                                   Warnings::new,
+                                   () -> EMPTY,
+                                   logger,
+                                   false);
+
+    /**
+     * Initialize coordinator write warnings for this thread. Must be called 
at the start of a client request.
+     */
+    public static void init()
+    {
+        STATE.init();
+    }
+
+    /**
+     * Update warnings for a partition after receiving responses from replicas.
+     *
+     * @param mutation the mutation that was written
+     * @param snapshot the aggregated warnings from replicas
+     */
+    public static void update(IMutation mutation, WriteWarningsSnapshot 
snapshot)
+    {
+        if (snapshot.isEmpty())
+        {
+            return;
+        }
+
+        Warnings warnings = STATE.mutable();
+        if (warnings == EMPTY)
+        {
+            return;
+        }
+
+        for (PartitionUpdate update : mutation.getPartitionUpdates())

Review Comment:
   it is enough to iterate  `mutation.getTableIds()` here



##########
src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java:
##########
@@ -91,6 +95,9 @@ public abstract class AbstractWriteResponseHandler<T> 
implements RequestCallback
     private volatile Map<InetAddressAndPort, RequestFailureReason> 
failureReasonByEndpoint;
     private final Dispatcher.RequestTime requestTime;
     private @Nullable final Supplier<Mutation> hintOnFailure;
+    private volatile WriteWarningContext warningContext;
+    private static final 
AtomicReferenceFieldUpdater<AbstractWriteResponseHandler, WriteWarningContext> 
warningsUpdater
+    = 
AtomicReferenceFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
WriteWarningContext.class, "warningContext");

Review Comment:
   nit; follow the code style here (see above, line 88/89 + 91/92)



##########
src/java/org/apache/cassandra/db/WriteThresholds.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Utility class for checking write threshold warnings on replicas.
+ * CASSANDRA-17258: paxos and accord do complex thread hand off and custom 
write logic which makes this patch complex, so was deferred
+ */
+public class WriteThresholds
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(WriteThresholds.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+    /**
+     * Check write thresholds for all partition updates in a mutation.
+     * This method iterates through all partition updates in the mutation.
+     *
+     * @param mutation the mutation containing one or more partition updates
+     */
+    public static void checkWriteThresholds(Mutation mutation)
+    {
+        if (!DatabaseDescriptor.isDaemonInitialized() || 
!DatabaseDescriptor.getWriteThresholdsEnabled())
+            return;
+
+        DataStorageSpec.LongBytesBound sizeWarnThreshold = 
DatabaseDescriptor.getWriteSizeWarnThreshold();
+        int tombstoneWarnThreshold = 
DatabaseDescriptor.getWriteTombstoneWarnThreshold();
+
+        if (sizeWarnThreshold == null && tombstoneWarnThreshold == -1)
+            return;
+
+        long sizeWarnBytes = sizeWarnThreshold != null ? 
sizeWarnThreshold.toBytes() : -1;
+
+        for (PartitionUpdate update : mutation.getPartitionUpdates())

Review Comment:
   Since we only need to check one time for each table, I think we can iterate 
over `mutation.getTableIds()` here instead, number of updated tables is smaller 
or equal than the total number of updates
   
   Also, once we have warned for both tombstones and size, we can stop iterating



##########
test/distributed/org/apache/cassandra/distributed/test/thresholds/AbstractWriteThresholdWarning.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.thresholds;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Base class for write threshold warning distributed tests.
+ * Tests coordinator-side warning aggregation from replica responses.
+ */
+public abstract class AbstractWriteThresholdWarning extends TestBaseImpl
+{
+    protected static ICluster<IInvokableInstance> CLUSTER;
+    protected static com.datastax.driver.core.Cluster JAVA_DRIVER;
+    protected static com.datastax.driver.core.Session JAVA_DRIVER_SESSION;
+
+    @BeforeClass
+    public static void setupClass() throws IOException
+    {
+        Cluster.Builder builder = Cluster.build(3);
+        builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, 
Feature.GOSSIP));
+        CLUSTER = builder.start();
+        JAVA_DRIVER = JavaDriverUtils.create(CLUSTER);
+        JAVA_DRIVER_SESSION = JAVA_DRIVER.connect();
+    }
+
+    protected abstract long totalWarnings();
+    protected abstract void assertWarnings(List<String> warnings);
+    protected abstract void populateTopPartitions(int pk, long value);
+    protected abstract void clearTopPartitions();

Review Comment:
   the implementations of this methods are both empty, can remove



##########
src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.service.thresholds.ThresholdCounter;
+
+/**
+ * Immutable snapshot of write warnings. Simpler than WarningsSnapshot since 
writes never abort (warnings only).
+ */
+public class WriteWarningsSnapshot
+{
+    private static final WriteWarningsSnapshot EMPTY = new 
WriteWarningsSnapshot(ThresholdCounter.empty(), ThresholdCounter.empty());
+
+    public final ThresholdCounter writeSize;
+    public final ThresholdCounter writeTombstone;
+
+    private WriteWarningsSnapshot(ThresholdCounter writeSize, ThresholdCounter 
writeTombstone)
+    {
+        this.writeSize = writeSize;
+        this.writeTombstone = writeTombstone;
+    }
+
+    public static WriteWarningsSnapshot empty()
+    {
+        return EMPTY;
+    }
+
+    public static WriteWarningsSnapshot create(ThresholdCounter writeSize, 
ThresholdCounter writeTombstone)
+    {
+        if (writeSize == ThresholdCounter.empty() && writeTombstone == 
ThresholdCounter.empty())

Review Comment:
   Maybe add a method `isEmpty()` in `ThresholdCounter`?



##########
src/java/org/apache/cassandra/service/writes/thresholds/WriteWarningsSnapshot.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.writes.thresholds;
+
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.service.thresholds.ThresholdCounter;
+
+/**
+ * Immutable snapshot of write warnings. Simpler than WarningsSnapshot since 
writes never abort (warnings only).
+ */
+public class WriteWarningsSnapshot
+{
+    private static final WriteWarningsSnapshot EMPTY = new 
WriteWarningsSnapshot(ThresholdCounter.empty(), ThresholdCounter.empty());
+
+    public final ThresholdCounter writeSize;
+    public final ThresholdCounter writeTombstone;
+
+    private WriteWarningsSnapshot(ThresholdCounter writeSize, ThresholdCounter 
writeTombstone)
+    {
+        this.writeSize = writeSize;
+        this.writeTombstone = writeTombstone;
+    }
+
+    public static WriteWarningsSnapshot empty()
+    {
+        return EMPTY;
+    }
+
+    public static WriteWarningsSnapshot create(ThresholdCounter writeSize, 
ThresholdCounter writeTombstone)
+    {
+        if (writeSize == ThresholdCounter.empty() && writeTombstone == 
ThresholdCounter.empty())
+            return EMPTY;
+        return new WriteWarningsSnapshot(writeSize, writeTombstone);
+    }
+
+    public boolean isEmpty()
+    {
+        return this == EMPTY;
+    }
+
+    public WriteWarningsSnapshot merge(WriteWarningsSnapshot other)
+    {
+        if (other == null || other == EMPTY)
+            return this;
+        return WriteWarningsSnapshot.create(

Review Comment:
   I think the recommended code style here is something like:
   ```
           return 
WriteWarningsSnapshot.create(writeSize.merge(other.writeSize), 
                                               
writeTombstone.merge(other.writeTombstone));
   ```



##########
src/java/org/apache/cassandra/service/WriteResponseHandler.java:
##########
@@ -58,6 +61,12 @@ public WriteResponseHandler(ReplicaPlan.ForWrite 
replicaPlan, WriteType writeTyp
 
     public void onResponse(Message<T> m)
     {
+        if (m != null)

Review Comment:
   looks like `m == null` indicates a response from the local host, should we 
instead assume `from` = `FBUtilities.getBroadcastAddressAndPort()` if `m==null` 
in this block?



##########
src/java/org/apache/cassandra/service/StorageProxy.java:
##########
@@ -2023,7 +2026,23 @@ public void runMayThrow()
             {
                 try
                 {
+                    MessageParams.reset();
+
+                    boolean trackWriteWarnings = description instanceof 
Mutation && handler instanceof AbstractWriteResponseHandler;

Review Comment:
   We should probably also check if 
`DatabaseDescriptor.getWriteThresholdsEnabled()` is true here, to avoid  that 
slightly expensive block below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to