apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2012532595
##########
server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java:
##########
@@ -25,11 +25,11 @@ public enum FetchIsolation {
TXN_COMMITTED;
public static FetchIsolation of(FetchRequest request) {
- return of(request.replicaId(), request.isolationLevel());
+ return of(request.replicaId(), request.isolationLevel(), false);
}
- public static FetchIsolation of(int replicaId, IsolationLevel
isolationLevel) {
- if (!FetchRequest.isConsumer(replicaId)) {
+ public static FetchIsolation of(int replicaId, IsolationLevel
isolationLevel, boolean isShareFetchRequest) {
+ if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) {
Review Comment:
Hmmm, I am not sure if we should have `isShareFetchRequest` as a param.
Shall there be any other replicaId for share consumers?
cc: @junrao
```
public static final int ORDINARY_CONSUMER_ID = -1;
public static final int DEBUGGING_CONSUMER_ID = -2;
public static final int FUTURE_LOCAL_REPLICA_ID = -3;
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -83,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
public final int streamsNumStandbyReplicas;
+ public final int shareIsolationLevel;
Review Comment:
KIP says `Valid values "read_committed" and "read_uncommitted" (default)`
so why do we have the int value?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -104,7 +104,8 @@ class KafkaApis(val requestChannel: RequestChannel,
time: Time,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager,
- val clientMetricsManager: ClientMetricsManager
+ val clientMetricsManager: ClientMetricsManager,
+ val groupConfigManager: GroupConfigManager
Review Comment:
Though it's fine to have another manager in KafkaApis but would it be better
to have `groupConfigManager` in `SharePartitionManager`? KafkaApis can pass
default IsolationLevel using `FetchIsolation.of(-1,
GroupConfig.defaultShareIsolationLevel, true)` and SharePartitionManager can
check for updated IsolationLevel using below code. The final isolation level
can go in `ShareFetch` class? This approach require less plumbing in KafkaApis
but adds a isolation level param in `ShareFetch` which might be different from
`FetchParams`. What do you think?
```
val isolationLevel: IsolationLevel =
IsolationLevel.forId(groupConfigManager.groupConfig(groupId).get().shareIsolationLevel().toByte)
FetchIsolation.of(-1, isolationLevel, true)
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##########
@@ -65,6 +67,13 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG =
"group.streams.num.standby.replicas";
+ public static final String SHARE_ISOLATION_LEVEL_CONFIG =
"share.isolation.level";
+ public static final int SHARE_ISOLATION_LEVEL_DEFAULT =
IsolationLevel.READ_UNCOMMITTED.id();
+ public static final String SHARE_ISOLATION_LEVEL_DOC = "Controls how to
read records written transactionally. " +
+ "If set to \"read_committed\", the share group will only deliver
transactional records which have been committed. " +
+ "If set to \"read_uncommitted\", the share group will return all
messages, even transactional messages which have been aborted. " +
+ "Non-transactional records will be returned unconditionally in either
mode.";
Review Comment:
nit: would be good to align with share group configs above.
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire(
}
Review Comment:
Don't we need the filtering here as well? Do we cover this scenario in unit
test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]