dcapwell commented on code in PR #4556: URL: https://github.com/apache/cassandra/pull/4556#discussion_r2729739085
########## src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.thresholds; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; + +import io.netty.util.concurrent.FastThreadLocal; + +/** + * Generic ThreadLocal state manager for coordinator warnings. + * Provides lifecycle management (init, update, done, reset) for accumulating + * warnings across a client request. + * + * @param <S> the type of state to manage (e.g., Map, custom holder class) + */ +public class CoordinatorWarningsState<S> +{ + private final FastThreadLocal<S> threadLocal; + private final S initSentinel; + private final S emptySentinel; + private final Supplier<S> stateFactory; + private final Logger logger; + private final String name; + private final boolean enableDefensiveChecks; + + /** + * Creates a new coordinator warnings state manager. + * + * @param name descriptive name for logging + * @param initSentinel sentinel value indicating initialized but empty state + * @param emptySentinel sentinel value indicating cleared state + * @param stateFactory factory to create new mutable state instances + * @param logger logger for diagnostic messages + * @param enableDefensiveChecks whether to enable defensive checks + */ + public CoordinatorWarningsState(String name, + S initSentinel, + @Nullable S emptySentinel, + Supplier<S> stateFactory, + Logger logger, + boolean enableDefensiveChecks) + { + this.name = name; + this.initSentinel = initSentinel; + this.emptySentinel = emptySentinel; + this.stateFactory = stateFactory; + this.logger = logger; + this.enableDefensiveChecks = enableDefensiveChecks; + this.threadLocal = new FastThreadLocal<>(); + } + + /** + * Initialize state for this thread. + * Must be called at the start of a client request. + */ + public void init() + { + if (logger.isTraceEnabled()) + logger.trace("{}.init()", name); + + S current = threadLocal.get(); + if (current != null && enableDefensiveChecks) + { + throw new AssertionError(name + ".init called while state is not null: " + current); + } + threadLocal.set(initSentinel); + } + + /** + * Reset/clear state for this thread. + */ + public void reset() + { + if (logger.isTraceEnabled()) + logger.trace("{}.reset()", name); + + if (emptySentinel != null) + threadLocal.set(emptySentinel); + else + threadLocal.remove(); + } + + /** + * Get current state, creating mutable state if needed. + * Transitions from initSentinel to mutable state on first access. + * + * @param fallbackSupplier supplies fallback state if not initialized + * @return current mutable state + */ + public S getMutableState(Supplier<S> fallbackSupplier) + { + S state = threadLocal.get(); + + if (state == null) + { + if (enableDefensiveChecks) + throw new AssertionError(name + " accessing state without calling .init() first"); + return fallbackSupplier.get(); + } + + if (state == initSentinel) + { + state = stateFactory.get(); + threadLocal.set(state); + } + + return state; + } + + /** + * Check if state is initialized and not empty. + */ + public boolean isActive() + { + S state = threadLocal.get(); + return state != null && state != initSentinel && state != emptySentinel; + } + + /** + * Process accumulated state and reset. + * Typical pattern for done() implementations. + * + * @param stateChecker checks if state should be processed (e.g., isEmpty()) + * @param processor processes the state + * @param errorHandler handles any errors during processing + */ + public void processAndReset(StateChecker<S> stateChecker, Review Comment: this seems like overkill... do we really need `StateChecker` and `Consumer`? Can't we just have a `Consumer`? read: ``` map -> map != INIT && !map.isEmpty(), CoordinatorWarnings::processWarnings, ``` `processWarnings` is more than capable to handle this check? write: ``` w -> w.partitions != null && !w.partitions.isEmpty(), CoordinatorWriteWarnings::processWarnings, ``` same, `processWarnings` can handle this Our style guide calls out its not good to over abstract away; fine the smallest interface needed for your problem. ########## test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java: ########## @@ -74,21 +74,26 @@ public static SimpleQueryResult unsafeExecuteInternal(String query, ConsistencyL ProtocolVersion.CURRENT, null), requestTime); - // Collect warnings reported during the query. Review Comment: why remove this comment? ########## test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java: ########## @@ -74,21 +74,26 @@ public static SimpleQueryResult unsafeExecuteInternal(String query, ConsistencyL ProtocolVersion.CURRENT, null), requestTime); - // Collect warnings reported during the query. CoordinatorWarnings.done(); + CoordinatorWriteWarnings.done(); + List<String> warnings = ClientWarn.instance.getWarnings(); Review Comment: why are you changing this logic? ########## src/java/org/apache/cassandra/service/thresholds/CoordinatorWarningsState.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.thresholds; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; + +import io.netty.util.concurrent.FastThreadLocal; + +/** + * Generic ThreadLocal state manager for coordinator warnings. + * Provides lifecycle management (init, update, done, reset) for accumulating + * warnings across a client request. + * + * @param <S> the type of state to manage (e.g., Map, custom holder class) + */ +public class CoordinatorWarningsState<S> +{ + private final FastThreadLocal<S> threadLocal; + private final S initSentinel; + private final S emptySentinel; + private final Supplier<S> stateFactory; + private final Logger logger; + private final String name; + private final boolean enableDefensiveChecks; + + /** + * Creates a new coordinator warnings state manager. + * + * @param name descriptive name for logging + * @param initSentinel sentinel value indicating initialized but empty state + * @param emptySentinel sentinel value indicating cleared state + * @param stateFactory factory to create new mutable state instances + * @param logger logger for diagnostic messages + * @param enableDefensiveChecks whether to enable defensive checks + */ + public CoordinatorWarningsState(String name, + S initSentinel, + @Nullable S emptySentinel, + Supplier<S> stateFactory, + Logger logger, + boolean enableDefensiveChecks) + { + this.name = name; + this.initSentinel = initSentinel; + this.emptySentinel = emptySentinel; + this.stateFactory = stateFactory; + this.logger = logger; + this.enableDefensiveChecks = enableDefensiveChecks; + this.threadLocal = new FastThreadLocal<>(); + } + + /** + * Initialize state for this thread. + * Must be called at the start of a client request. + */ + public void init() + { + if (logger.isTraceEnabled()) + logger.trace("{}.init()", name); + + S current = threadLocal.get(); + if (current != null && enableDefensiveChecks) + { + throw new AssertionError(name + ".init called while state is not null: " + current); + } + threadLocal.set(initSentinel); + } + + /** + * Reset/clear state for this thread. + */ + public void reset() + { + if (logger.isTraceEnabled()) + logger.trace("{}.reset()", name); + + if (emptySentinel != null) + threadLocal.set(emptySentinel); + else + threadLocal.remove(); + } + + /** + * Get current state, creating mutable state if needed. + * Transitions from initSentinel to mutable state on first access. + * + * @param fallbackSupplier supplies fallback state if not initialized + * @return current mutable state + */ + public S getMutableState(Supplier<S> fallbackSupplier) + { + S state = threadLocal.get(); + + if (state == null) + { + if (enableDefensiveChecks) + throw new AssertionError(name + " accessing state without calling .init() first"); + return fallbackSupplier.get(); + } + + if (state == initSentinel) + { + state = stateFactory.get(); + threadLocal.set(state); + } + + return state; + } + + /** + * Check if state is initialized and not empty. + */ + public boolean isActive() + { + S state = threadLocal.get(); + return state != null && state != initSentinel && state != emptySentinel; + } + + /** + * Process accumulated state and reset. + * Typical pattern for done() implementations. + * + * @param stateChecker checks if state should be processed (e.g., isEmpty()) + * @param processor processes the state + * @param errorHandler handles any errors during processing + */ + public void processAndReset(StateChecker<S> stateChecker, + Consumer<S> processor, + BiConsumer<S, Exception> errorHandler) Review Comment: do we really need the `errorHandler`? Its fine to say "dont mess up" and require each caller to be safe; we can simplify our logic. If write has unsafe logic, it can do the try/catch ########## test/distributed/org/apache/cassandra/distributed/impl/CoordinatorHelper.java: ########## @@ -58,10 +59,9 @@ public static SimpleQueryResult unsafeExecuteInternal(String query, ConsistencyL prepared.validate(QueryState.forInternalCalls().getClientState()); prepared.validate(clientState); - // Start capturing warnings on this thread. Note that this will implicitly clear out any previous Review Comment: why remove this comment? ########## test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java: ########## @@ -44,28 +43,33 @@ public static SimpleQueryResult toQueryResult(ResultMessage res) return toQueryResult(res, true); } + public static SimpleQueryResult toQueryResult(ResultMessage res, List<String> explicitWarnings) Review Comment: can we revert this file? This change isn't needed, and if it is it feels like its trying to work around a bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

