[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12297: KAFKA-13846: Follow up PR to address review comments
vamossagar12 commented on code in PR #12297: URL: https://github.com/apache/kafka/pull/12297#discussion_r916462553 ## docs/upgrade.html: ## @@ -19,6 +19,56 @@
[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13152: Labels: kip (was: needs-kip) > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-13152: - > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > Fix For: 3.3.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13152: Fix Version/s: (was: 3.3.0) > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
guozhangwang commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r916385802 ## streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java: ## @@ -55,6 +55,10 @@ SessionStore buildSessionStore(final long retentionPeriod, valueSerde).build(); } +StoreType getStoreType() { Review Comment: Sounds great! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request, #12391: KAFKA-10199: Add task updater metrics
guozhangwang opened a new pull request, #12391: URL: https://github.com/apache/kafka/pull/12391 Should only be reviewed after https://github.com/apache/kafka/pull/12387. Needs discussion on KIPs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
lihaosky commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r916381888 ## streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java: ## @@ -55,6 +55,10 @@ SessionStore buildSessionStore(final long retentionPeriod, valueSerde).build(); } +StoreType getStoreType() { Review Comment: For `shouldRemoveExpired`, the RocksDB version has different params for session window. For `shouldNotExpireFromOpenIterator`, it's not supported in RocksDBSessionStore For `shouldMatchPositionAfterPut`, the RocksDB version has different cast than `InMemorySessionStore`. So we could put `shouldRemoveExpired` and `shouldMatchPositionAfterPut` in parent class and add different logic depending on store type. We can also leave it here. wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile
ijuma commented on PR #12380: URL: https://github.com/apache/kafka/pull/12380#issuecomment-1178420987 +1 for removing these builds from PRs. Generally, we should only include widely used platforms where we also have stable build machines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
guozhangwang commented on PR #12370: URL: https://github.com/apache/kafka/pull/12370#issuecomment-1178363643 @lihaosky Thanks for your PR, I made a pass on it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
guozhangwang commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r916291502 ## streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java: ## @@ -55,6 +55,10 @@ SessionStore buildSessionStore(final long retentionPeriod, valueSerde).build(); } +StoreType getStoreType() { Review Comment: Just wondering, since we are leveraging on the inherited abstract session bytes store test, are there any test cases below that can be moved there or even are duplicates? ## streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java: ## @@ -827,6 +827,139 @@ public void shouldPutAndBackwardFetchWithPrefix() { } } +@Test +public void shouldFetchSessionForSingleKey() { +// Only for TimeFirstSessionKeySchema schema +if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) { +return; +} + +final String keyA = "a"; +final String keyB = "b"; +final String keyC = "c"; + +final StateSerdes stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class); +final Bytes key1 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA)); +final Bytes key2 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB)); +final Bytes key3 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC)); + +final byte[] expectedValue1 = serializeValue(10); +final byte[] expectedValue2 = serializeValue(50); +final byte[] expectedValue3 = serializeValue(100); +final byte[] expectedValue4 = serializeValue(200); + +bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), expectedValue1); +bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), expectedValue2); +bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3); +bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4); + +final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession( Review Comment: Could we also add a "miss" case for `fetchSession` here? ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java: ## @@ -62,6 +57,11 @@ public static Collection getKeySchema() { }); } +@Override +StoreType getStoreType() { Review Comment: This is for line 52 above: should we rename that function to a more meaningful one? maybe `getParamStoreType`? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java: ## @@ -289,7 +289,10 @@ private StoreBuilder> materialize(final MaterializedInt // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); +} else { +builder.withCachingDisabled(); Review Comment: We would only default at the Materialized level, so the if condition should be sufficient even if it's default to turn on caching, but nevertheless making it explicit seems fine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r916314304 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1112,9 +1121,10 @@ public synchronized RequestFuture maybeLeaveGroup(String leaveReason) { // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); +final String reason = truncateIfRequired(leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, -Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason)) +Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(reason)) Review Comment: makes sense to inline, thanks! As for the test, though I'm still catching up with the code base, I'll check if the `request` argument of the `send` method could be asserted. https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1130 Does that sound fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
jnh5y commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178280579 > > @etolbakov Thanks for the PR. When the jira is assigned, it usually means that someone is working on it. In this case, I know that @jnh5y wanted to fix this. @jnh5y If you haven't started yet, we could perhaps review this one. > > Thank you @dajac for finding time for a review! Sorry for creating confusion, indeed I should have asked you or @jnh5y first (was too excited about the change 😅). Noted for the future. I will address your suggestions ASAP. No worries, it is all yours! I could have assigned the ticket to myself, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r916314304 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1112,9 +1121,10 @@ public synchronized RequestFuture maybeLeaveGroup(String leaveReason) { // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); +final String reason = truncateIfRequired(leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, -Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason)) +Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(reason)) Review Comment: makes sense to inline, thanks! As for the test, though I'm still catching up with the code base, I'll check if the argument of the `send` method could be asserted. Does that sound fine? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178254261 > @etolbakov Thanks for the PR. When the jira is assigned, it usually means that someone is working on it. In this case, I know that @jnh5y wanted to fix this. @jnh5y If you haven't started yet, we could perhaps review this one. Thank you @dajac for finding time for a review! Sorry for creating confusion, indeed I should have asked you or @jnh5y first (was too excited about the change 😅). Noted for the future. I will address your suggestions ASAP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch
[ https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14050. - Resolution: Not A Problem I'm going to close this since the incompatible schema change did not affect any released versions. > Older clients cannot deserialize ApiVersions response with finalized feature > epoch > -- > > Key: KAFKA-14050 > URL: https://issues.apache.org/jira/browse/KAFKA-14050 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > When testing kraft locally, we encountered this exception from an older > client: > {code:java} > [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | > adminclient-1394] org.apache.kafka.common.utils.KafkaThread > lambda$configureThread$0 - Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-1394': > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'api_keys': Error reading array of size 1207959552, only 579 bytes available > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118) > at > org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378) > at > org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187) > at > org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333) > at > org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260) > at java.base/java.lang.Thread.run(Thread.java:832) {code} > The cause appears to be from a change to the type of the > `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to > int64: > [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.] > Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this > by creating a new field. We will have to leave the existing tag in the > protocol spec and consider it dead. > Credit for this find goes to [~dajac] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji opened a new pull request, #12390: KAFKA-14055; Txn markers should not be removed by matching records in the offset map
hachikuji opened a new pull request, #12390: URL: https://github.com/apache/kafka/pull/12390 When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the. Currently control records are excluded when building the offset map, but not when doing the cleaning. This patch fixes the problem by checking for control batches in the `shouldRetainRecord` callback. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #12091: KAFKA-12943: update aggregating documentation
mjsax commented on PR #12091: URL: https://github.com/apache/kafka/pull/12091#issuecomment-1178222919 Thanks for the PR! Merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #12091: KAFKA-12943: update aggregating documentation
mjsax merged PR #12091: URL: https://github.com/apache/kafka/pull/12091 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect
mjsax commented on PR #11442: URL: https://github.com/apache/kafka/pull/11442#issuecomment-1178218780 I am not familiar with Connect code -- @rhauch @kkonstantine @mimaison should be able to help. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
mjsax commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r916256345 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java: ## @@ -289,7 +289,10 @@ private StoreBuilder> materialize(final MaterializedInt // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); +} else { +builder.withCachingDisabled(); Review Comment: Maybe @guozhangwang can chime in? I am happy both ways I guess, but we should keep it consistent if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12943) Bug in Kafka Streams Documentation (Aggregating)
[ https://issues.apache.org/jira/browse/KAFKA-12943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563944#comment-17563944 ] Marco Lotz commented on KAFKA-12943: The PR was raised in April and approved twice. Is there anything else missing to merge it to trunk? > Bug in Kafka Streams Documentation (Aggregating) > > > Key: KAFKA-12943 > URL: https://issues.apache.org/jira/browse/KAFKA-12943 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Reporter: Rajesh KSV >Assignee: Marco Lotz >Priority: Minor > > In the doc, for aggregating function, the example is incorrect > [https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#aggregating] > It says > {code:java} > KTable aggregatedStream = groupedStream.aggregate( > () -> 0L, /* initializer */ > (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */ > Materialized.as("aggregated-stream-store") /* state store name */ > .withValueSerde(Serdes.Long()); /* serde for aggregate value */{code} > Generic types are missing. Instead, it should be > {code:java} > KTable aggregatedStream = groupedStream.aggregate( > () -> 0L, /* initializer */ > (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */ > Materialized. byte[]>>as("aggregated-stream-store") /* state store name */ > .withValueSerde(Serdes.Long()); /* serde for aggregate value */ {code} > Otherwise, code won't work. I myself verified it. > Reference > [https://stackoverflow.com/questions/51040555/the-method-withvalueserde-in-the-type-materialized-is-not-applicable/51049472] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vineethgn commented on pull request #9188: break when dst is full so that unwrap isn't called when appreadbuffer…
vineethgn commented on PR #9188: URL: https://github.com/apache/kafka/pull/9188#issuecomment-1178145392 Based on the discussion in https://github.com/apache/kafka/pull/5785/, i tried the fix in this PR, it didnt work for me. I tried to reproduce the issue and i can reproduce the issue consistently in the following conditions. 1. [Conscrypt](https://github.com/google/conscrypt) as ssl provider 2. openjdk-11-jdk I checked by adding additional logs and could find that the packet buffer size and app buffer returned by Conscrypt ssl engine mismatches. Below are the logs that i got SSl engine class org.conscrypt.Java8EngineWrapper (org.apache.kafka.common.network.SslTransportLayer) SSl engine session class org.conscrypt.Java8ExtendedSSLSession (org.apache.kafka.common.network.SslTransportLayer) SSl engine session App Buffer Size 16384 (org.apache.kafka.common.network.SslTransportLayer) SSl engine session Net Buffer Size 16709 (org.apache.kafka.common.network.SslTransportLayer) Since this was a blocker, i modified the Ssltransportlayer to have appreadbuffer atleast the same size as netreadbuffer, before call to unwrap. Is it ok to have appReadBuffer atleast the same size as netReadBuffer ? so far the clusters are running smoothly without buffer overflow or underflow issue. below is the code fix that i made that seemed to be working fine so far without further issues. while (netReadBuffer.position() > 0) { netReadBuffer.flip(); SSLEngineResult unwrapResult; try { appReadBuffer = Utils.ensureCapacity(appReadBuffer, netReadBufferSize()); unwrapResult = sslEngine.unwrap(netReadBuffer, appReadBuffer); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14055) Transaction markers may be lost during cleaning if data keys conflict with marker keys
[ https://issues.apache.org/jira/browse/KAFKA-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-14055: --- Assignee: Jason Gustafson > Transaction markers may be lost during cleaning if data keys conflict with > marker keys > -- > > Key: KAFKA-14055 > URL: https://issues.apache.org/jira/browse/KAFKA-14055 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > Fix For: 3.3.0, 3.0.2, 3.1.2, 3.2.1 > > > We have been seeing recently hanging transactions occur on streams changelog > topics quite frequently. After investigation, we found that the keys used in > the changelog topic conflict with the keys used in the transaction markers > (the schema used in control records is 4 bytes, which happens to be the same > for the changelog topics that we investigated). When we build the offset map > prior to cleaning, we do properly exclude the transaction marker keys, but > the bug is the fact that we do not exclude them during the cleaning phase. > This can result in the marker being removed from the cleaned log before the > corresponding data is removed when there is a user record with a conflicting > key at a higher offset. A side effect of this is a so-called "hanging" > transaction, but the bigger problem is that we lose the atomicity of the > transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14055) Transaction markers may be lost during cleaning if data keys conflict with marker keys
Jason Gustafson created KAFKA-14055: --- Summary: Transaction markers may be lost during cleaning if data keys conflict with marker keys Key: KAFKA-14055 URL: https://issues.apache.org/jira/browse/KAFKA-14055 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Fix For: 3.3.0, 3.0.2, 3.1.2, 3.2.1 We have been seeing recently hanging transactions occur on streams changelog topics quite frequently. After investigation, we found that the keys used in the changelog topic conflict with the keys used in the transaction markers (the schema used in control records is 4 bytes, which happens to be the same for the changelog topics that we investigated). When we build the offset map prior to cleaning, we do properly exclude the transaction marker keys, but the bug is the fact that we do not exclude them during the cleaning phase. This can result in the marker being removed from the cleaned log before the corresponding data is removed when there is a user record with a conflicting key at a higher offset. A side effect of this is a so-called "hanging" transaction, but the bigger problem is that we lose the atomicity of the transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang merged pull request #12297: KAFKA-13846: Follow up PR to address review comments
guozhangwang merged PR #12297: URL: https://github.com/apache/kafka/pull/12297 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12297: KAFKA-13846: Follow up PR to address review comments
guozhangwang commented on code in PR #12297: URL: https://github.com/apache/kafka/pull/12297#discussion_r916180825 ## docs/upgrade.html: ## @@ -19,6 +19,56 @@
[GitHub] [kafka] pjmagee commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect
pjmagee commented on PR #11442: URL: https://github.com/apache/kafka/pull/11442#issuecomment-1178051631 @mjsax I can see you are recently doing some things, is there any way we can get @mnegodaev PR merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r916162985 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1050,10 +1051,18 @@ public synchronized void requestRejoin(final String shortReason) { public synchronized void requestRejoin(final String shortReason, final String fullReason) { log.info("Request joining group due to: {}", fullReason); -this.rejoinReason = shortReason; +this.rejoinReason = truncateIfRequired(shortReason); Review Comment: In my opinion, it would be better to do this when the JoinGroupRequestData is created. It is here: https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L563 This ensures that we cover all the paths. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r916166251 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1112,9 +1121,10 @@ public synchronized RequestFuture maybeLeaveGroup(String leaveReason) { // attempt any resending if the request fails or times out. log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); +final String reason = truncateIfRequired(leaveReason); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, -Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason)) +Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(reason)) Review Comment: nit: Should we call `truncateIfRequired` inline here? Is it possible to also test this path? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1050,10 +1051,18 @@ public synchronized void requestRejoin(final String shortReason) { public synchronized void requestRejoin(final String shortReason, final String fullReason) { log.info("Request joining group due to: {}", fullReason); -this.rejoinReason = shortReason; +this.rejoinReason = truncateIfRequired(shortReason); Review Comment: In my opinion, it would be better to do this when the JoinGroupRequestData is created. It is here: https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L563. This ensures that we cover all the paths. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ## @@ -571,6 +571,11 @@ public void testRejoinReason() { expectSyncGroup(generation, memberId); ensureActiveGroup(generation, memberId); assertEquals("", coordinator.rejoinReason()); + +// check limit length of reason field + mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED)); Review Comment: It would be better to do a test similar to the one at L557. That one verifies the content put in the JoinGroupRequest as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rayokota commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json
rayokota commented on PR #12126: URL: https://github.com/apache/kafka/pull/12126#issuecomment-1178039321 The fix is not quite correct. You need to conditionally call `struct.getWithoutDefault(field.name())`. You should also add a test using a struct. See the corresponding fix for the JSON Schema converter in https://github.com/confluentinc/schema-registry/pull/2326 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178035300 @etolbakov Thanks for the PR. When the jira is assigned, it usually means that someone is working on it. In this case, I know that @jnh5y wanted to fix this. @jnh5y If you haven't started yet, we could perhaps review this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window
lihaosky commented on code in PR #12370: URL: https://github.com/apache/kafka/pull/12370#discussion_r916160977 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java: ## @@ -289,7 +289,10 @@ private StoreBuilder> materialize(final MaterializedInt // do not enable cache if the emit final strategy is used if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { builder.withCachingEnabled(); +} else { +builder.withCachingDisabled(); Review Comment: Want to make it explicit in case default is caching enabled. I can remove it if you think it's unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #12383: REVERT: Kip-770
mjsax merged PR #12383: URL: https://github.com/apache/kafka/pull/12383 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12383: REVERT: Kip-770
guozhangwang commented on PR #12383: URL: https://github.com/apache/kafka/pull/12383#issuecomment-1178024477 Thanks @mjsax , LGTM! Please feel free to merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
jsancio commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1177949852 > I havn't flesh it, the basic idea is shown in this PR, only read up to Isolation.COMMITTED when reading from observer and invoke fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs) on onUpdateLeaderHighWatermark. I see. This would not work for co-located Kafka servers, right? Co-located Kafka server are servers that are running both a controller and a broker. In that case the replica will read uncommitted data and the leader will not send a FETCH response when the HW changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] methodmissing commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)
methodmissing commented on PR #12385: URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177916112 Thanks! I'll draft something > I forgot to mention that AuthorizableRequestContext is a public interface so we need a KIP for this change. The process is described here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] methodmissing commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)
methodmissing commented on PR #12385: URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177906495 @dajac thanks for the prompt reply. We at Shopify currently have a custom authoriser deployed where we log context like `principal`, `clientId`, `topic` etc. to producer and consumer metrics specific compacted topics to build up insights of our consumer landscape. We'd also be interested in collecting additional information about the clients given our client landscape (and this is true for most larger organisations) is diverse and vary between `Ruby`, `Java`, `Go` and `Python` + variations of versions amongst those. Given KIP 511 introduced support for tracking client information per connection and this information is already present in a request context, it could be valuable to also expose it to plugins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)
dajac commented on PR #12385: URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177901616 @methodmissing I forgot to mention that `AuthorizableRequestContext` is a public interface so we need a KIP for this change. The process is described here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
jsancio commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r916060600 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -117,26 +117,32 @@ class BrokerMetadataListener( } finally { reader.close() } - _publisher.foreach(publish) - // If we detected a change in metadata.version, generate a local snapshot - val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta => -featuresDelta.metadataVersionChange().isPresent + _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes + if (shouldSnapshot()) { +maybeStartSnapshot() } - snapshotter.foreach { snapshotter => -_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes -if (shouldSnapshot() || metadataVersionChanged) { - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { -_bytesSinceLastSnapshot = 0L - } -} - } + _publisher.foreach(publish) } } private def shouldSnapshot(): Boolean = { -_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots +(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged() + } + + private def metadataVersionChanged(): Boolean = { +_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta => Review Comment: I see. Before this PR the broker was generating a snapshot when handling committed records that didn't contain a feature change because we only reset `_delta` after the listener calls `publish`. This PR makes sense to me. Let's document this decision in this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
tombentley commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r916044426 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -307,6 +313,22 @@ class LogManager(logDirs: Seq[File], log } + import java.util.concurrent.ThreadFactory + + // factory class for naming the log recovery threads used in metrics + class LogRecoveryThreadFactory(val baseName: String) extends ThreadFactory { +val threadsNum = new AtomicInteger(0) Review Comment: Singlular `threadNum`? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File], error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { + removeLogRecoveryMetrics() threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") } + private[log] def addLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath), +Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { +val threadName = s"log-recovery-${dir.getAbsolutePath}-$i" +newGauge("remainingSegmentsToRecover", () => numRemainingSegments.get(threadName), + Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString)) + } +} + } + + private[log] def removeLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { Review Comment: `numRecoveryThreadsPerDataDir` can be changed, so what happens if it changes after the metrics are added and before they're removed? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File], error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { + removeLogRecoveryMetrics() threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") } + private[log] def addLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath), +Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { +val threadName = s"log-recovery-${dir.getAbsolutePath}-$i" Review Comment: Can we encapsulate this in a method, since it's duplicating the logic in the ThreadFactory? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
tombentley commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r916026938 ## core/src/main/scala/kafka/log/LogCleaner.scala: ## @@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig, } /** -* Reconfigure log clean config. This simply stops current log cleaners and creates new ones. +* Reconfigure log clean config. It will (1) update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stop current log cleaners and create new ones. Review Comment: ```suggestion * Reconfigure log clean config. The will: * 1. update desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond, if necessary * 2. stop current log cleaners and create new ones. ``` ## core/src/main/scala/kafka/utils/Throttler.scala: ## @@ -36,7 +36,7 @@ import scala.math._ * @param time: The time implementation to use */ @threadsafe -class Throttler(desiredRatePerSec: Double, +class Throttler(var desiredRatePerSec: Double, Review Comment: That seems reasonable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol
[ https://issues.apache.org/jira/browse/KAFKA-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563831#comment-17563831 ] Travis Bischel commented on KAFKA-14048: >From a technical perspective, this is a very interesting and compelling >proposal. Can the KIP list be updated to include this? I've noticed the KIP list has not been updated much and there are a good few KIPs missing. Also, there is no public discussion link: has there been any public discussion on this proposal? > The Next Generation of the Consumer Rebalance Protocol > -- > > Key: KAFKA-14048 > URL: https://issues.apache.org/jira/browse/KAFKA-14048 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > This Jira tracks the development of KIP-848: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
jsancio commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r915992223 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -117,26 +117,32 @@ class BrokerMetadataListener( } finally { reader.close() } - _publisher.foreach(publish) - // If we detected a change in metadata.version, generate a local snapshot - val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta => -featuresDelta.metadataVersionChange().isPresent + _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes + if (shouldSnapshot()) { +maybeStartSnapshot() } - snapshotter.foreach { snapshotter => -_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes -if (shouldSnapshot() || metadataVersionChanged) { - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { -_bytesSinceLastSnapshot = 0L - } -} - } + _publisher.foreach(publish) } } private def shouldSnapshot(): Boolean = { -_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots +(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged() + } + + private def metadataVersionChanged(): Boolean = { +_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { featuresDelta => Review Comment: Do you mind writing a comment as to why we check that the publisher is set. If I understand this correctly it is not a correctness issue but a performance issue, right? If I remember correctly, @mumrah mentioned that he wanted to generate a snapshot whenever the metadata version changes. Unfortunately, I couldn't find a mention of this in KIP-778. With this change this is no longer true. What do you think @mumrah ? ## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ## @@ -240,6 +239,42 @@ class BrokerMetadataListenerTest { } } + @Test + def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) +listener.getImageRecords().get() +assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot on metadata version change before starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + +val endOffset = 100L +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), endOffset) +listener.startPublishing(new MockMetadataPublisher()).get() +assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChange(): Unit = { +val snapshotter = new MockMetadataSnapshotter() +val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) +listener.startPublishing(new MockMetadataPublisher()).get() + +val endOffset = 100L +updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, (MetadataVersion.latest().featureLevel() - 1).toShort, endOffset) +listener.getImageRecords().get() Review Comment: Let's write a comment saying that this `get` is waiting for the metadata version update above to get processed. ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -51,20 +51,26 @@ class BrokerMetadataSnapshotter( val eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix.getOrElse("")) override def maybeStartSnapshot(lastContainedLogTime: Long, image: MetadataImage): Boolean = synchronized { -if (_currentSnapshotOffset == -1L) { +if (_currentSnapshotOffset != -1) { + info(s"Declining to create a new snapshot at ${image.highestOffsetAndEpoch()} because " + +s"there is already a snapshot in progress at offset ${_currentSnapshotOffset}") Review Comment: Nit, the curly brackets ({}) are not needed for `_currentSnapshotOffset`. This comment applies to a few places. ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -109,4 +115,9 @@ class BrokerMetadataSnapshotter( beginShutdown() eventQueue.close() } + + // VisibleForTesting + def currentSnapshotOffset(): Long = { +_currentSnapshotOffset + } Review Comment: It doesn't look like you use th
[jira] [Created] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException
Donald created KAFKA-14054: -- Summary: Unexpected client shutdown as TimeoutException is thrown as IllegalStateException Key: KAFKA-14054 URL: https://issues.apache.org/jira/browse/KAFKA-14054 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.1.1, 3.2.0, 3.1.0 Reporter: Donald Re: https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2 1) TimeoutException is thrown as IllegalStateException in {_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. Which causes the client to shutdown in {_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}. 2) Should Timeout be a recoverable error which is expected to be handled by User? 3) This issue is exposed by change KAFKA-12887 which was introduced in kafka-streams ver 3.1.0 *code referenced* {code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded} public boolean commitNeeded() { if (commitNeeded) { return true; } else { for (final Map.Entry entry : consumedOffsets.entrySet()) { final TopicPartition partition = entry.getKey(); try { final long offset = mainConsumer.position(partition); if (offset > entry.getValue() + 1) { commitNeeded = true; entry.setValue(offset - 1); } } catch (final TimeoutException error) { // the `consumer.position()` call should never block, because we know that we did process data // for the requested partition and thus the consumer should have a valid local position // that it can return immediately // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` throw new IllegalStateException(error); } catch (final KafkaException fatal) { throw new StreamsException(fatal); } } return commitNeeded; } } {code} {code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable} private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse getActionForThrowable(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action; if (wrappedExceptionIsIn(throwable, EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) { action = SHUTDOWN_CLIENT; } else { action = streamsUncaughtExceptionHandler.handle(throwable); } return action; } private void handleStreamsUncaughtException(final Throwable throwable, final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler, final boolean skipThreadReplacement) { final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler); if (oldHandler) { log.warn("Stream's new uncaught exception handler is set as well as the deprecated old handler." + "The old handler will be ignored as long as a new handler is set."); } switch (action) { case REPLACE_THREAD: if (!skipThreadReplacement) { log.error("Replacing thread in the streams uncaught exception handler", throwable); replaceStreamThread(throwable); } else { log.debug("Skipping thread replacement for recoverable error"); } break; case SHUTDOWN_CLIENT: log.error("Encountered the following exception during processing " + "and Kafka Streams opted to " + action + "." + " The streams client is going to shut down now. ", throwable); closeToError(); break; {code} *Stacktrace* {code:java|title=error log kafka-streams v. 3.1.0} 2022-06-22 13:58:35,796 ERROR thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1] logger=o.a.k.s.KafkaStreams - stream-client [com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. org.apache.kafka.streams.errors.StreamsException: java.lang.Illega
[GitHub] [kafka] dajac commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)
dajac commented on PR #12385: URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177681675 @methodmissing Thanks for the PR. Could you elaborate a little more on the motivation for this change? Also, don't we need to extend the `AuthorizableRequestContext` if you want to get the client information in the plugins? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] singhnama commented on a diff in pull request #12359: KAFKA-13983: Fail the creation with "/" in resource name in zk ACL
singhnama commented on code in PR #12359: URL: https://github.com/apache/kafka/pull/12359#discussion_r915906279 ## core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala: ## @@ -209,6 +213,9 @@ class AclAuthorizer extends Authorizer with Logging { throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $IBP_2_0_IV1 or greater") } +if (inValidAclBindingResourceName(aclBinding.pattern().name())) { Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
showuon commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r915850442 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1856,7 +1856,7 @@ class LogCleanerTest { @Test def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") -oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000: java.lang.Double) Review Comment: nit: I think we can just put `1000L` here. ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1856,7 +1856,7 @@ class LogCleanerTest { @Test def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") -oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000: java.lang.Double) Review Comment: nit: I think we can just put `1000L` here for long value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically
tyamashi-oss commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r915753889 ## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ## @@ -1854,6 +1853,33 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { +val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") +oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000) + +val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) { + // shutdown() and startup() are called in LogCleaner.reconfigure(). + // Empty startup() and shutdown() to ensure that no unnecessary log cleaner threads remain after this test. + override def startup(): Unit = {} + override def shutdown(): Unit = {} +} + +try { + assertEquals(1000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be initialized from initial `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.") + + val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 2000) + + logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps)) + + assertEquals(2000, logCleaner.throttler.desiredRatePerSec, s"Throttler.desiredRatePerSec should be updated with new `${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.") +} finally logCleaner.shutdown(); Review Comment: I've fixed it. Thank you for pointing out. https://github.com/apache/kafka/pull/12296/commits/5c727409cc03a8b459f6d30b8df888489f52d743 With this fix, the java8/scala2.12 build now succeeds. However, the `JDK 11 / Scala 2.13` build, which used to succeed, now failed. I believe this new build failure is unrelated to my changes. I tried running the same command in my local environment once and this build was successful. ``` [2022-07-07T06:22:53.589Z] > Task :streams:integrationTest FAILED [2022-07-07T06:22:53.589Z] [2022-07-07T06:22:53.589Z] FAILURE: Build failed with an exception. ``` https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12296/7/pipeline/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pch8388 opened a new pull request, #12389: MINOR: Fix result string
pch8388 opened a new pull request, #12389: URL: https://github.com/apache/kafka/pull/12389 Removes duplicate result strings and avoids mistakes when changing the string format ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
[ https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban updated KAFKA-14053: - Description: When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. Because of this, the producer should skip aborting the partition, and bump the epoch to fence the in-flight requests. was: When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. > Transactional producer should bump the epoch when a batch encounters delivery > timeout > - > > Key: KAFKA-14053 > URL: https://issues.apache.org/jira/browse/KAFKA-14053 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > When a batch fails due to delivery timeout, it is possible that the batch is > still in-flight. Due to underlying infra issues, it is possible that an > EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight > batch is processed on the leader. This can cause transactional batches to be > appended to the log after the corresponding abort marker. > This can cause the LSO to be infinitely blocked in the partition, or can even > violate processing guarantees, as the out-of-order batch can become part of > the next transaction. > Because of this, the producer should skip aborting the partition, and bump > the epoch to fence the in-flight requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
Daniel Urban created KAFKA-14053: Summary: Transactional producer should bump the epoch when a batch encounters delivery timeout Key: KAFKA-14053 URL: https://issues.apache.org/jira/browse/KAFKA-14053 Project: Kafka Issue Type: Bug Reporter: Daniel Urban Assignee: Daniel Urban When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] omkreddy commented on a diff in pull request #12359: KAFKA-13983: Fail the creation with "/" in resource name in zk ACL
omkreddy commented on code in PR #12359: URL: https://github.com/apache/kafka/pull/12359#discussion_r915675978 ## core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala: ## @@ -209,6 +213,9 @@ class AclAuthorizer extends Authorizer with Logging { throw new UnsupportedVersionException(s"Adding ACLs on prefixed resource patterns requires " + s"${KafkaConfig.InterBrokerProtocolVersionProp} of $IBP_2_0_IV1 or greater") } +if (inValidAclBindingResourceName(aclBinding.pattern().name())) { Review Comment: Can we this to `validateAclBinding` method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection
divijvaidya commented on PR #12381: URL: https://github.com/apache/kafka/pull/12381#issuecomment-1177319338 > Sorry, looks like the `match` case still needs default case: > > ``` > [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-12381/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scalaː190ː7: match may not be exhaustive. > ``` My bad Luke! I did a hasty change yesterday without verifying the build. I have fixed it now. I appreciate your quick responses on this PR. If you get time please take a look at my other open PRs as well: https://github.com/apache/kafka/pulls/divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on PR #12320: URL: https://github.com/apache/kafka/pull/12320#issuecomment-1177283596 @C0urante thanks, addressed the comments. hopefully checkstyle is all good now - I wasn't able to run the gradle task locally for some reason, it kept getting stuck, but now it's working and reporting no issues -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915638083 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.crypto.SecretKey; +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey(); + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static SecretKey getMockSecretKey() { +SecretKey mockKey = mock(SecretKey.class); +when(mockKey.getFormat()).thenReturn("RAW");// supported format by + when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8)); +return mockKey; +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +MOCK_SECRET_KEY, +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacSHA1"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private final HttpClient httpClient = mock(HttpClient.class); + +@Parameterized.Parameter +public Throwable requestException; + +@Parameterized.Parameters +public static Collection requestExceptions() { +return Arrays.asList(new Object[][]{ +{new InterruptedException()}, +{new ExecutionException(null)}, +{new TimeoutException()} +}); +} + +private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException { +Request req = mock(Request.class); +when(req.send()).thenThrow(t); Review Comment: nice catch, thx. setupHttpClient does this but I forgot to do it here and didn't notice the NPEs. fixed
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915637537 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.crypto.SecretKey; +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey(); + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static SecretKey getMockSecretKey() { +SecretKey mockKey = mock(SecretKey.class); +when(mockKey.getFormat()).thenReturn("RAW");// supported format by + when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8)); +return mockKey; +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +MOCK_SECRET_KEY, +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacSHA1"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private final HttpClient httpClient = mock(HttpClient.class); + +@Parameterized.Parameter +public Throwable requestException; + +@Parameterized.Parameters +public static Collection requestExceptions() { +return Arrays.asList(new Object[][]{ +{new InterruptedException()}, +{new ExecutionException(null)}, +{new TimeoutException()} +}); +} + +private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException { +Request req = mock(Request.class); +when(req.send()).thenThrow(t); +return req; +} + +@Test +public void testFailureDuringRequestCausesInternalServerError() throw
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915637006 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.crypto.SecretKey; +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey(); + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static SecretKey getMockSecretKey() { +SecretKey mockKey = mock(SecretKey.class); +when(mockKey.getFormat()).thenReturn("RAW");// supported format by + when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8)); +return mockKey; +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +MOCK_SECRET_KEY, +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacSHA1"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private final HttpClient httpClient = mock(HttpClient.class); Review Comment: nice, thanks - didn't know this. fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure
elkkhan commented on code in PR #12320: URL: https://github.com/apache/kafka/pull/12320#discussion_r915628889 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java: ## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import javax.crypto.SecretKey; +import javax.ws.rs.core.Response; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Enclosed.class) +public class RestClientTest { + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); +private static final TypeReference TEST_TYPE = new TypeReference() { +}; +private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey(); + +private static void assertIsInternalServerError(ConnectRestException e) { +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); +assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.errorCode()); +} + +private static SecretKey getMockSecretKey() { +SecretKey mockKey = mock(SecretKey.class); +when(mockKey.getFormat()).thenReturn("RAW");// supported format by + when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8)); +return mockKey; +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient, String requestSignatureAlgorithm) { +return RestClient.httpRequest( +httpClient, +"https://localhost:1234/api/endpoint";, +"GET", +null, +new TestDTO("requestBodyData"), +TEST_TYPE, +MOCK_SECRET_KEY, +requestSignatureAlgorithm); +} + +private static RestClient.HttpResponse httpRequest(HttpClient httpClient) { +String validRequestSignatureAlgorithm = "HmacSHA1"; +return httpRequest(httpClient, validRequestSignatureAlgorithm); +} + + +@RunWith(Parameterized.class) +public static class RequestFailureParameterizedTest { +private final HttpClient httpClient = mock(HttpClient.class); + +@Parameterized.Parameter +public Throwable requestException; + +@Parameterized.Parameters +public static Collection requestExceptions() { +return Arrays.asList(new Object[][]{ +{new InterruptedException()}, +{new ExecutionException(null)}, +{new TimeoutException()} +}); +} + +private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException { +Request req = mock(Request.class); +when(req.send()).thenThrow(t); Review Comment: @C0urante I believe this is already in the latest commit, any chance you were referring to a the previous comm
[jira] [Commented] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation
[ https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563626#comment-17563626 ] Hang HOU commented on KAFKA-6221: - I met this problem again in kafka 0.10.0. one last thing is if I create some topics, and each with only one partition,then I send 3 million records to every topic,some topic never loss data when produce but the rest topics absolutely occur loss about 1/7 total data:( > ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic > creation > --- > > Key: KAFKA-6221 > URL: https://issues.apache.org/jira/browse/KAFKA-6221 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0 > Environment: RHEL 7 >Reporter: Alex Dunayevsky >Priority: Minor > Original Estimate: 336h > Remaining Estimate: 336h > > This issue appeared to happen frequently on 0.10.2.0. > On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. > We'll focus on reproducing it on 0.10.2.1 and 1.0.0. > *TOPOLOGY:* > 3 brokers, 1 zk. > *REPRODUCING STRATEGY:* > Create a few dozens topics (say, 40) one by one, each with replication factor > 2. Number of partitions, generally, does not matter but, for easier > reproduction, should not be too small (around 30 or so). > *CREATE 40 TOPICS:* > {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic > "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; > done {code} > *ERRORS* > {code:java} > *BROKER 1* > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,27] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,9] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,3] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,15] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for > partition [topic1_p28_r2,21] to broker > 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > *BROKER 2* > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for > partition [topic20_p28_r2,12] to broker > 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition. (kafka.server.ReplicaFetcherThread) > [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for > partition [topic20_p28_r2,12] to broker > 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This > server does not host this topic-partition.
[GitHub] [kafka] showuon commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection
showuon commented on PR #12381: URL: https://github.com/apache/kafka/pull/12381#issuecomment-1177230649 Sorry, looks like the `match` case still needs default case: ``` [Error] /home/jenkins/workspace/Kafka_kafka-pr_PR-12381/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scalaː190ː7: match may not be exhaustive. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
dengziming commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1177222867 > Do you have a diff for this solution? @jsancio I havn't flesh it, the basic idea is shown in this PR, only read up to `Isolation.COMMITTED` when reading from observer and invoke `fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs)` on `onUpdateLeaderHighWatermark`. > maybe we can try the original proposed one? @showuon This solution can only take effect if the problem is that the heartbeat period is much bigger than NoOpRecord, it may not work now since main the problem is from `RaftClient.fetchPurgatory`, so we should try a new solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zigarn commented on pull request #12175: KIP-840: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer
zigarn commented on PR #12175: URL: https://github.com/apache/kafka/pull/12175#issuecomment-1177208913 @dajac KIP is now accepted. Next step is to validate this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zigarn commented on a diff in pull request #12175: KIP-840: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer
zigarn commented on code in PR #12175: URL: https://github.com/apache/kafka/pull/12175#discussion_r915565817 ## core/src/main/scala/kafka/tools/ConsoleConsumer.scala: ## @@ -248,6 +248,10 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("prop") .ofType(classOf[String]) +val messageFormatterConfigOpt = parser.accepts("config", s"Config properties file to initialize the message formatter. Note that $messageFormatterArgOpt takes precedence over this config.") Review Comment: `--consumer.config` is to setup the consumer, this is to setup the message formatter. It the same difference as `--consumer-property` and `--property`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov opened a new pull request, #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov opened a new pull request, #12388: URL: https://github.com/apache/kafka/pull/12388 Hello David @dajac , I found an open JIRA ticket - https://issues.apache.org/jira/browse/KAFKA-14013 and decided to come up with a suggestion. Could you please take a look if you have a spare minute? Happy to adjust the changes if that's required -- Regards, Eugene -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #12382: MINOR: kafka system tests should support larger EBS volumes for newer instances
dajac merged PR #12382: URL: https://github.com/apache/kafka/pull/12382 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating
dengziming commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r915532837 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter( */ private var _currentSnapshotOffset = -1L + /** + * The offset of the newest snapshot, or -1 if there hasn't been one. Accessed only under + * the object lock. + */ + private var _latestSnapshotOffset = -1L Review Comment: This seems an existing problem, I checked the logic of `QuorumController.SnapshotGeneratorManager`, it will check the result of `raftClient.createSnapshot` and just skip it if it's empty, however, at broker side, `BrokerSnapshotWriterBuilder` will throw an exception on empty snapshot. how about making it consistent with controller code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org