Copilot commented on code in PR #2606:
URL: https://github.com/apache/fluss/pull/2606#discussion_r2781844478
##########
fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java:
##########
@@ -307,6 +312,9 @@ void maybeWaitForWriterId(Set<PhysicalTablePath>
tablePaths) throws Throwable {
if (t instanceof AuthorizationException || retryCount >=
RETRY_TIMES) {
throw t;
} else {
+ if (t instanceof InvalidMetadataException) {
+
metadataUpdater.updatePhysicalTableMetadata(tablePaths);
+ }
Review Comment:
This new behavior (refreshing metadata on `InvalidMetadataException` during
writer-id initialization) is not covered by tests. Adding a
Sender/IdempotenceManager test that injects an `InvalidMetadataException` from
`initWriter` and asserts `MetadataUpdater.updatePhysicalTableMetadata(...)` is
invoked would prevent regressions and validate the fix for stuck jobs.
##########
fluss-common/src/main/java/org/apache/fluss/cluster/ServerType.java:
##########
@@ -20,5 +20,14 @@
/** The type of server in Fluss cluster. */
public enum ServerType {
COORDINATOR,
- TABLET_SERVER
+ TABLET_SERVER;
+
+ public static ServerType forId(int id) {
+ if (id == 0) {
+ return COORDINATOR;
+ } else if (id == 1) {
+ return TABLET_SERVER;
+ }
+ throw new IllegalArgumentException("Unknown server type id: " + id);
Review Comment:
`forId` is introduced, but the inverse mapping (ServerType -> id) is
duplicated elsewhere using hard-coded values. Add a single authoritative
mapping (e.g., `id()`/`toId()` on the enum) and reuse it to prevent drift if
server types are extended in the future.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -40,6 +40,7 @@ message ApiVersionsRequest {
message ApiVersionsResponse {
repeated PbApiVersion api_versions = 1;
Review Comment:
`server_type` is encoded as a bare int but there is no inline documentation
of the allowed values. Add a short comment documenting the mapping (e.g.,
0=COORDINATOR, 1=TABLET_SERVER) and how unknown values should be treated for
forward compatibility.
```suggestion
repeated PbApiVersion api_versions = 1;
// server_type encodes the logical role of the server:
// 0 = COORDINATOR, 1 = TABLET_SERVER. For forward compatibility, clients
// MUST gracefully handle unknown values (for example, by treating them as
// an unknown/generic server type) rather than failing.
```
##########
fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java:
##########
@@ -307,6 +312,9 @@ void maybeWaitForWriterId(Set<PhysicalTablePath>
tablePaths) throws Throwable {
if (t instanceof AuthorizationException || retryCount >=
RETRY_TIMES) {
throw t;
} else {
+ if (t instanceof InvalidMetadataException) {
+
metadataUpdater.updatePhysicalTableMetadata(tablePaths);
Review Comment:
`metadataUpdater` is dereferenced without a null check, but some call sites
construct `IdempotenceManager` with a null updater (e.g., tests). This creates
a fragile implicit contract that no `InvalidMetadataException` will occur on
this path. Either enforce non-null with a precondition (especially when
idempotence is enabled) or make the field @Nullable and guard the update call
accordingly.
```suggestion
if (metadataUpdater != null) {
metadataUpdater.updatePhysicalTableMetadata(tablePaths);
} else {
LOG.warn(
"Received InvalidMetadataException but
metadataUpdater is null; "
+ "skipping metadata refresh for
tables: {}",
tablePaths);
}
```
##########
fluss-common/src/main/java/org/apache/fluss/exception/InvalidServerTypeException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception thrown when a request is sent to a server of an invalid type. */
+public class InvalidServerTypeException extends InvalidMetadataException {
+ public InvalidServerTypeException(String message) {
+ super(message);
+ }
Review Comment:
Most exception types in this package are annotated as public API
(`@PublicEvolving`) and declare `serialVersionUID`. Consider adding those here
for consistency and serialization stability, and optionally provide
constructors that accept a cause (mirroring other `InvalidMetadataException`
subclasses).
##########
fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/ServerConnectionTest.java:
##########
@@ -196,6 +201,55 @@ void testConnectionMetrics() throws ExecutionException,
InterruptedException {
connection.close().get();
}
+ @Test
+ void testWrongServerType() {
+ ServerNode wrongServerTypeNode =
+ new ServerNode(
+ serverNode.id(),
+ serverNode.host(),
+ serverNode.port(),
+ ServerType.COORDINATOR);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Bootstrap mockBootstrap =
+ new Bootstrap() {
+ @Override
+ public ChannelFuture connect(String host, int port) {
+ return bootstrap
+ .connect(host, port)
+ .addListener(f -> countDownLatch.await());
+ }
Review Comment:
Blocking the Netty event loop thread with `countDownLatch.await()` (and
without any timeout) can make this test hang indefinitely if something goes
wrong before the latch is released. Use an `await` with a reasonable timeout
and fail the test if it expires, or use a non-blocking synchronization
mechanism that doesn't park the event loop.
##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -198,6 +198,7 @@ public CompletableFuture<ApiVersionsResponse>
apiVersions(ApiVersionsRequest req
}
ApiVersionsResponse response = new ApiVersionsResponse();
response.addAllApiVersions(apiVersions);
+ response.setServerType(provider == ServerType.COORDINATOR ? 0 : 1);
Review Comment:
`server_type` is currently set via hard-coded numeric IDs (0/1). To avoid
magic numbers and keep the encoding/decoding consistent, consider adding a
`toId()` (or similar) method on `ServerType` and using it here (and in other
`setServerType` call sites).
```suggestion
response.setServerType(provider.ordinal());
```
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java:
##########
@@ -372,6 +374,19 @@ private void handleApiVersionsResponse(ApiMessage
response, Throwable cause) {
return;
}
+ if (((ApiVersionsResponse) response).hasServerType()) {
+ ServerType serverType =
+ ServerType.forId(((ApiVersionsResponse)
response).getServerType());
+ if (serverType != node.serverType()) {
+ LOG.warn(
+ "Server type mismatch, expected: {}, actual: {}",
+ node.serverType(),
+ serverType);
+ close(new InvalidServerTypeException("Server type mismatch"));
Review Comment:
`ServerType.forId(...)` can throw (e.g., unknown/new server type id). If
that happens inside this `whenComplete` handler, the connection may remain
stuck in CONNECTING with queued requests never flushed and without a clean
close. Catch parsing failures here and close the connection with a
deterministic exception (e.g.,
InvalidServerTypeException/IllegalStateException) so pending requests complete
promptly.
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java:
##########
@@ -372,6 +374,19 @@ private void handleApiVersionsResponse(ApiMessage
response, Throwable cause) {
return;
}
+ if (((ApiVersionsResponse) response).hasServerType()) {
+ ServerType serverType =
+ ServerType.forId(((ApiVersionsResponse)
response).getServerType());
+ if (serverType != node.serverType()) {
+ LOG.warn(
+ "Server type mismatch, expected: {}, actual: {}",
+ node.serverType(),
+ serverType);
+ close(new InvalidServerTypeException("Server type mismatch"));
Review Comment:
The thrown `InvalidServerTypeException` message is currently just "Server
type mismatch" while the log contains expected/actual. Consider including
expected and actual server types (and/or the node) in the exception message so
callers/tests have actionable context without needing server logs.
```suggestion
close(
new InvalidServerTypeException(
"Server type mismatch, expected: "
+ node.serverType()
+ ", actual: "
+ serverType
+ ", node: "
+ node));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]