[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12297: KAFKA-13846: Follow up PR to address review comments

2022-07-07 Thread GitBox


vamossagar12 commented on code in PR #12297:
URL: https://github.com/apache/kafka/pull/12297#discussion_r916462553


##
docs/upgrade.html:
##
@@ -19,6 +19,56 @@
 
 

[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-07-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13152:

Labels: kip  (was: needs-kip)

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-07-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-13152:
-

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.3.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-07-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13152:

Fix Version/s: (was: 3.3.0)

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


guozhangwang commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916385802


##
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##
@@ -55,6 +55,10 @@  SessionStore buildSessionStore(final long 
retentionPeriod,
 valueSerde).build();
 }
 
+StoreType getStoreType() {

Review Comment:
   Sounds great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #12391: KAFKA-10199: Add task updater metrics

2022-07-07 Thread GitBox


guozhangwang opened a new pull request, #12391:
URL: https://github.com/apache/kafka/pull/12391

   Should only be reviewed after https://github.com/apache/kafka/pull/12387.
   
   Needs discussion on KIPs.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


lihaosky commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916381888


##
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##
@@ -55,6 +55,10 @@  SessionStore buildSessionStore(final long 
retentionPeriod,
 valueSerde).build();
 }
 
+StoreType getStoreType() {

Review Comment:
   For `shouldRemoveExpired`, the RocksDB version has different params for 
session window.
   
   For `shouldNotExpireFromOpenIterator`, it's not supported in 
RocksDBSessionStore
   
   For `shouldMatchPositionAfterPut`, the RocksDB version has different cast 
than `InMemorySessionStore`.
   
   So we could put `shouldRemoveExpired` and `shouldMatchPositionAfterPut` in 
parent class and add different logic depending on store type. We can also leave 
it here. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #12380: MINOR: Get rid of agent checks in Jenkinsfile

2022-07-07 Thread GitBox


ijuma commented on PR #12380:
URL: https://github.com/apache/kafka/pull/12380#issuecomment-1178420987

   +1 for removing these builds from PRs. Generally, we should only include 
widely used platforms where we also have stable build machines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


guozhangwang commented on PR #12370:
URL: https://github.com/apache/kafka/pull/12370#issuecomment-1178363643

   @lihaosky Thanks for your PR, I made a pass on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


guozhangwang commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916291502


##
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##
@@ -55,6 +55,10 @@  SessionStore buildSessionStore(final long 
retentionPeriod,
 valueSerde).build();
 }
 
+StoreType getStoreType() {

Review Comment:
   Just wondering, since we are leveraging on the inherited abstract session 
bytes store test, are there any test cases below that can be moved there or 
even are duplicates?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java:
##
@@ -827,6 +827,139 @@ public void shouldPutAndBackwardFetchWithPrefix() {
 }
 }
 
+@Test
+public void shouldFetchSessionForSingleKey() {
+// Only for TimeFirstSessionKeySchema schema
+if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+return;
+}
+
+final String keyA = "a";
+final String keyB = "b";
+final String keyC = "c";
+
+final StateSerdes stateSerdes = 
StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+final Bytes key1 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA));
+final Bytes key2 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB));
+final Bytes key3 = 
Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC));
+
+final byte[] expectedValue1 = serializeValue(10);
+final byte[] expectedValue2 = serializeValue(50);
+final byte[] expectedValue3 = serializeValue(100);
+final byte[] expectedValue4 = serializeValue(200);
+
+bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), 
expectedValue1);
+bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), 
expectedValue2);
+bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), 
expectedValue3);
+bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), 
expectedValue4);
+
+final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) 
bytesStore).fetchSession(

Review Comment:
   Could we also add a "miss" case for `fetchSession` here?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java:
##
@@ -62,6 +57,11 @@ public static Collection getKeySchema() {
 });
 }
 
+@Override
+StoreType getStoreType() {

Review Comment:
   This is for line 52 above: should we rename that function to a more 
meaningful one? maybe `getParamStoreType`?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##
@@ -289,7 +289,10 @@ private  StoreBuilder> 
materialize(final MaterializedInt
 // do not enable cache if the emit final strategy is used
 if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
+} else {
+builder.withCachingDisabled();

Review Comment:
   We would only default at the Materialized level, so the if condition should 
be sufficient even if it's default to turn on caching, but nevertheless making 
it explicit seems fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


etolbakov commented on code in PR #12388:
URL: https://github.com/apache/kafka/pull/12388#discussion_r916314304


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1112,9 +1121,10 @@ public synchronized RequestFuture 
maybeLeaveGroup(String leaveReason) {
 // attempt any resending if the request fails or times out.
 log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
 generation.memberId, coordinator, leaveReason);
+final String reason = truncateIfRequired(leaveReason);
 LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
 rebalanceConfig.groupId,
-Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(reason))

Review Comment:
   makes sense to inline, thanks!
   As for the test, though I'm still catching up with the code base, I'll check 
if the `request` argument of the `send` method could be asserted.
   
https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1130
   Does that sound fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


jnh5y commented on PR #12388:
URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178280579

   > > @etolbakov Thanks for the PR. When the jira is assigned, it usually 
means that someone is working on it. In this case, I know that @jnh5y wanted to 
fix this. @jnh5y If you haven't started yet, we could perhaps review this one.
   > 
   > Thank you @dajac for finding time for a review! Sorry for creating 
confusion, indeed I should have asked you or @jnh5y first (was too excited 
about the change 😅). Noted for the future. I will address your suggestions ASAP.
   
   No worries, it is all yours!  I could have assigned the ticket to myself, 
etc.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


etolbakov commented on code in PR #12388:
URL: https://github.com/apache/kafka/pull/12388#discussion_r916314304


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1112,9 +1121,10 @@ public synchronized RequestFuture 
maybeLeaveGroup(String leaveReason) {
 // attempt any resending if the request fails or times out.
 log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
 generation.memberId, coordinator, leaveReason);
+final String reason = truncateIfRequired(leaveReason);
 LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
 rebalanceConfig.groupId,
-Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(reason))

Review Comment:
   makes sense to inline, thanks!
   As for the test, though I'm still catching up with the code base, I'll check 
if the argument of the `send` method could be asserted.
   Does that sound fine?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] etolbakov commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


etolbakov commented on PR #12388:
URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178254261

   > @etolbakov Thanks for the PR. When the jira is assigned, it usually means 
that someone is working on it. In this case, I know that @jnh5y wanted to fix 
this. @jnh5y If you haven't started yet, we could perhaps review this one.
   
   Thank you @dajac for finding time for a review! 
   Sorry for creating confusion, indeed I should have asked you or @jnh5y first 
(was too excited about the change 😅). Noted for the future.
   I will address your suggestions ASAP. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14050) Older clients cannot deserialize ApiVersions response with finalized feature epoch

2022-07-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-14050.
-
Resolution: Not A Problem

I'm going to close this since the incompatible schema change did not affect any 
released versions. 

> Older clients cannot deserialize ApiVersions response with finalized feature 
> epoch
> --
>
> Key: KAFKA-14050
> URL: https://issues.apache.org/jira/browse/KAFKA-14050
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 3.3.0
>
>
> When testing kraft locally, we encountered this exception from an older 
> client:
> {code:java}
> [ERROR] 2022-07-05 16:45:01,165 [kafka-admin-client-thread | 
> adminclient-1394] org.apache.kafka.common.utils.KafkaThread 
> lambda$configureThread$0 - Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1394':
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'api_keys': Error reading array of size 1207959552, only 579 bytes available
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:118)
> at 
> org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:378)
> at 
> org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:187)
> at 
> org.apache.kafka.clients.NetworkClient$DefaultClientInterceptor.parseResponse(NetworkClient.java:1333)
> at 
> org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:752)
> at 
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:888)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:577)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1329)
> at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1260)
> at java.base/java.lang.Thread.run(Thread.java:832) {code}
> The cause appears to be from a change to the type of the 
> `FinalizedFeaturesEpoch` field in the `ApiVersions` response from int32 to 
> int64: 
> [https://github.com/apache/kafka/pull/9001/files#diff-32006e8becae918416debdb9ac76bf8a1ad12b83aaaf5f8819b6ecc00c1fb56bR58.]
> Fortunately, `FinalizedFeaturesEpoch` is a tagged field, so we can fix this 
> by creating a new field. We will have to leave the existing tag in the 
> protocol spec and consider it dead.
> Credit for this find goes to [~dajac] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji opened a new pull request, #12390: KAFKA-14055; Txn markers should not be removed by matching records in the offset map

2022-07-07 Thread GitBox


hachikuji opened a new pull request, #12390:
URL: https://github.com/apache/kafka/pull/12390

   When cleaning a topic with transactional data, if the keys used in the user 
data happen to conflict with the keys in the transaction markers, it is 
possible for the markers to get removed before the corresponding data from the 
transaction is removed. This results in a hanging transaction or the loss of 
the transaction's atomicity since it would effectively get bundled into the. 
Currently control records are excluded when building the offset map, but not 
when doing the cleaning. This patch fixes the problem by checking for control 
batches in the `shouldRetainRecord` callback.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #12091: KAFKA-12943: update aggregating documentation

2022-07-07 Thread GitBox


mjsax commented on PR #12091:
URL: https://github.com/apache/kafka/pull/12091#issuecomment-1178222919

   Thanks for the PR! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12091: KAFKA-12943: update aggregating documentation

2022-07-07 Thread GitBox


mjsax merged PR #12091:
URL: https://github.com/apache/kafka/pull/12091


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect

2022-07-07 Thread GitBox


mjsax commented on PR #11442:
URL: https://github.com/apache/kafka/pull/11442#issuecomment-1178218780

   I am not familiar with Connect code -- @rhauch @kkonstantine @mimaison 
should be able to help.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


mjsax commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916256345


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##
@@ -289,7 +289,10 @@ private  StoreBuilder> 
materialize(final MaterializedInt
 // do not enable cache if the emit final strategy is used
 if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
+} else {
+builder.withCachingDisabled();

Review Comment:
   Maybe @guozhangwang can chime in?
   
   I am happy both ways I guess, but we should keep it consistent if possible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12943) Bug in Kafka Streams Documentation (Aggregating)

2022-07-07 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563944#comment-17563944
 ] 

Marco Lotz commented on KAFKA-12943:


The PR was raised in April and approved twice. Is there anything else missing 
to merge it to trunk?

> Bug in Kafka Streams Documentation (Aggregating)
> 
>
> Key: KAFKA-12943
> URL: https://issues.apache.org/jira/browse/KAFKA-12943
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Rajesh KSV
>Assignee: Marco Lotz
>Priority: Minor
>
> In the doc, for aggregating function, the example is incorrect
> [https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#aggregating]
> It says
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized.as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */{code}
> Generic types are missing. Instead, it should be 
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized. byte[]>>as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */ {code}
> Otherwise, code won't work. I myself verified it. 
> Reference
> [https://stackoverflow.com/questions/51040555/the-method-withvalueserde-in-the-type-materialized-is-not-applicable/51049472]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vineethgn commented on pull request #9188: break when dst is full so that unwrap isn't called when appreadbuffer…

2022-07-07 Thread GitBox


vineethgn commented on PR #9188:
URL: https://github.com/apache/kafka/pull/9188#issuecomment-1178145392

   Based on the discussion in https://github.com/apache/kafka/pull/5785/, i 
tried the fix in this PR, it didnt work for me. 
   
   I tried to reproduce the issue and i can reproduce the issue consistently in 
the following conditions.
   
   1. [Conscrypt](https://github.com/google/conscrypt) as ssl provider 
   2. openjdk-11-jdk
   
   I checked by adding additional logs and could find that the packet buffer 
size and app buffer returned by Conscrypt ssl engine mismatches. Below are the 
logs that i got
   
   SSl engine class org.conscrypt.Java8EngineWrapper 
(org.apache.kafka.common.network.SslTransportLayer)
   SSl engine session class org.conscrypt.Java8ExtendedSSLSession 
(org.apache.kafka.common.network.SslTransportLayer)
   SSl engine session App Buffer Size 16384 
(org.apache.kafka.common.network.SslTransportLayer)
   SSl engine session Net Buffer Size 16709 
(org.apache.kafka.common.network.SslTransportLayer) 
   
   Since this was a blocker, i modified  the Ssltransportlayer to have 
appreadbuffer atleast the same size as netreadbuffer, before call to unwrap. Is 
it ok to have appReadBuffer atleast the same size as netReadBuffer ?
   
   so far  the clusters are running smoothly without buffer overflow or 
underflow issue. 
   
   below is the code fix that i made that seemed to be working fine so far 
without further issues. 
   while (netReadBuffer.position() > 0) {
netReadBuffer.flip();
SSLEngineResult unwrapResult;
try {
appReadBuffer = Utils.ensureCapacity(appReadBuffer, 
netReadBufferSize());
unwrapResult = sslEngine.unwrap(netReadBuffer, 
appReadBuffer);


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14055) Transaction markers may be lost during cleaning if data keys conflict with marker keys

2022-07-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-14055:
---

Assignee: Jason Gustafson

> Transaction markers may be lost during cleaning if data keys conflict with 
> marker keys
> --
>
> Key: KAFKA-14055
> URL: https://issues.apache.org/jira/browse/KAFKA-14055
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 3.3.0, 3.0.2, 3.1.2, 3.2.1
>
>
> We have been seeing recently hanging transactions occur on streams changelog 
> topics quite frequently. After investigation, we found that the keys used in 
> the changelog topic conflict with the keys used in the transaction markers 
> (the schema used in control records is 4 bytes, which happens to be the same 
> for the changelog topics that we investigated). When we build the offset map 
> prior to cleaning, we do properly exclude the transaction marker keys, but 
> the bug is the fact that we do not exclude them during the cleaning phase. 
> This can result in the marker being removed from the cleaned log before the 
> corresponding data is removed when there is a user record with a conflicting 
> key at a higher offset. A side effect of this is a so-called "hanging" 
> transaction, but the bigger problem is that we lose the atomicity of the 
> transaction. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14055) Transaction markers may be lost during cleaning if data keys conflict with marker keys

2022-07-07 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14055:
---

 Summary: Transaction markers may be lost during cleaning if data 
keys conflict with marker keys
 Key: KAFKA-14055
 URL: https://issues.apache.org/jira/browse/KAFKA-14055
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
 Fix For: 3.3.0, 3.0.2, 3.1.2, 3.2.1


We have been seeing recently hanging transactions occur on streams changelog 
topics quite frequently. After investigation, we found that the keys used in 
the changelog topic conflict with the keys used in the transaction markers (the 
schema used in control records is 4 bytes, which happens to be the same for the 
changelog topics that we investigated). When we build the offset map prior to 
cleaning, we do properly exclude the transaction marker keys, but the bug is 
the fact that we do not exclude them during the cleaning phase. This can result 
in the marker being removed from the cleaned log before the corresponding data 
is removed when there is a user record with a conflicting key at a higher 
offset. A side effect of this is a so-called "hanging" transaction, but the 
bigger problem is that we lose the atomicity of the transaction. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang merged pull request #12297: KAFKA-13846: Follow up PR to address review comments

2022-07-07 Thread GitBox


guozhangwang merged PR #12297:
URL: https://github.com/apache/kafka/pull/12297


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12297: KAFKA-13846: Follow up PR to address review comments

2022-07-07 Thread GitBox


guozhangwang commented on code in PR #12297:
URL: https://github.com/apache/kafka/pull/12297#discussion_r916180825


##
docs/upgrade.html:
##
@@ -19,6 +19,56 @@
 
 

[GitHub] [kafka] pjmagee commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect

2022-07-07 Thread GitBox


pjmagee commented on PR #11442:
URL: https://github.com/apache/kafka/pull/11442#issuecomment-1178051631

   @mjsax I can see you are recently doing some things, is there any way we can 
get @mnegodaev PR merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


dajac commented on code in PR #12388:
URL: https://github.com/apache/kafka/pull/12388#discussion_r916162985


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1050,10 +1051,18 @@ public synchronized void requestRejoin(final String 
shortReason) {
 public synchronized void requestRejoin(final String shortReason,
final String fullReason) {
 log.info("Request joining group due to: {}", fullReason);
-this.rejoinReason = shortReason;
+this.rejoinReason = truncateIfRequired(shortReason);

Review Comment:
   In my opinion, it would be better to do this when the JoinGroupRequestData 
is created. It is here: 
https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L563
   
   This ensures that we cover all the paths.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


dajac commented on code in PR #12388:
URL: https://github.com/apache/kafka/pull/12388#discussion_r916166251


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1112,9 +1121,10 @@ public synchronized RequestFuture 
maybeLeaveGroup(String leaveReason) {
 // attempt any resending if the request fails or times out.
 log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
 generation.memberId, coordinator, leaveReason);
+final String reason = truncateIfRequired(leaveReason);
 LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
 rebalanceConfig.groupId,
-Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(leaveReason))
+Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(reason))

Review Comment:
   nit: Should we call `truncateIfRequired` inline here? Is it possible to also 
test this path?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1050,10 +1051,18 @@ public synchronized void requestRejoin(final String 
shortReason) {
 public synchronized void requestRejoin(final String shortReason,
final String fullReason) {
 log.info("Request joining group due to: {}", fullReason);
-this.rejoinReason = shortReason;
+this.rejoinReason = truncateIfRequired(shortReason);

Review Comment:
   In my opinion, it would be better to do this when the JoinGroupRequestData 
is created. It is here: 
https://github.com/apache/kafka/blob/64ac302b1c6baa4b28e6fb90776985ac242d41e3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L563.
 This ensures that we cover all the paths.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##
@@ -571,6 +571,11 @@ public void testRejoinReason() {
 expectSyncGroup(generation, memberId);
 ensureActiveGroup(generation, memberId);
 assertEquals("", coordinator.rejoinReason());
+
+// check limit length of reason field
+
mockClient.prepareResponse(joinGroupFollowerResponse(defaultGeneration, 
memberId, JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED));

Review Comment:
   It would be better to do a test similar to the one at L557. That one 
verifies the content put in the JoinGroupRequest as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rayokota commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json

2022-07-07 Thread GitBox


rayokota commented on PR #12126:
URL: https://github.com/apache/kafka/pull/12126#issuecomment-1178039321

   The fix is not quite correct.  You need to conditionally call 
`struct.getWithoutDefault(field.name())`.  You should also add a test using a 
struct.  See the corresponding fix for the JSON Schema converter in 
https://github.com/confluentinc/schema-registry/pull/2326


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


dajac commented on PR #12388:
URL: https://github.com/apache/kafka/pull/12388#issuecomment-1178035300

   @etolbakov Thanks for the PR. When the jira is assigned, it usually means 
that someone is working on it. In this case, I know that @jnh5y wanted to fix 
this. @jnh5y If you haven't started yet, we could perhaps review this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

2022-07-07 Thread GitBox


lihaosky commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916160977


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##
@@ -289,7 +289,10 @@ private  StoreBuilder> 
materialize(final MaterializedInt
 // do not enable cache if the emit final strategy is used
 if (materialized.cachingEnabled() && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
 builder.withCachingEnabled();
+} else {
+builder.withCachingDisabled();

Review Comment:
   Want to make it explicit in case default is caching enabled. I can remove it 
if you think it's unnecessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12383: REVERT: Kip-770

2022-07-07 Thread GitBox


mjsax merged PR #12383:
URL: https://github.com/apache/kafka/pull/12383


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12383: REVERT: Kip-770

2022-07-07 Thread GitBox


guozhangwang commented on PR #12383:
URL: https://github.com/apache/kafka/pull/12383#issuecomment-1178024477

   Thanks @mjsax , LGTM! Please feel free to merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-07 Thread GitBox


jsancio commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1177949852

   > I havn't flesh it, the basic idea is shown in this PR, only read up to 
Isolation.COMMITTED when reading from observer and invoke 
fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs) on 
onUpdateLeaderHighWatermark.
   
   I see. This would not work for co-located Kafka servers, right? Co-located 
Kafka server are servers that are running both a controller and a broker. In 
that case the replica will read uncommitted data and the leader will not send a 
FETCH response when the HW changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] methodmissing commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)

2022-07-07 Thread GitBox


methodmissing commented on PR #12385:
URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177916112

   Thanks! I'll draft something
   
   > I forgot to mention that AuthorizableRequestContext is a public interface 
so we need a KIP for this change. The process is described here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] methodmissing commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)

2022-07-07 Thread GitBox


methodmissing commented on PR #12385:
URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177906495

   @dajac thanks for the prompt reply. We at Shopify currently have a custom 
authoriser deployed where we log context like `principal`, `clientId`, `topic` 
etc. to producer and consumer metrics specific compacted topics to build up 
insights of our consumer landscape.
   
   We'd also be interested in collecting additional information about the 
clients given our client landscape (and this is true for most larger 
organisations) is diverse and vary between `Ruby`, `Java`, `Go` and `Python` + 
variations of versions amongst those.
   
   Given KIP 511 introduced support for tracking client information per 
connection and this information is already present in a request context, it 
could be valuable to also expose it to plugins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)

2022-07-07 Thread GitBox


dajac commented on PR #12385:
URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177901616

   @methodmissing I forgot to mention that `AuthorizableRequestContext` is a 
public interface so we need a KIP for this change. The process is described 
here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-07 Thread GitBox


jsancio commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r916060600


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -117,26 +117,32 @@ class BrokerMetadataListener(
   } finally {
 reader.close()
   }
-  _publisher.foreach(publish)
 
-  // If we detected a change in metadata.version, generate a local snapshot
-  val metadataVersionChanged = Option(_delta.featuresDelta()).exists { 
featuresDelta =>
-featuresDelta.metadataVersionChange().isPresent
+  _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+  if (shouldSnapshot()) {
+maybeStartSnapshot()
   }
 
-  snapshotter.foreach { snapshotter =>
-_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-if (shouldSnapshot() || metadataVersionChanged) {
-  if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply())) {
-_bytesSinceLastSnapshot = 0L
-  }
-}
-  }
+  _publisher.foreach(publish)
 }
   }
 
   private def shouldSnapshot(): Boolean = {
-_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || 
metadataVersionChanged()
+  }
+
+  private def metadataVersionChanged(): Boolean = {
+_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { 
featuresDelta =>

Review Comment:
   I see. Before this PR the broker was generating a snapshot when handling 
committed records that didn't contain a feature change because we only reset 
`_delta` after the listener calls `publish`. This PR makes sense to me. Let's 
document this decision in this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-07 Thread GitBox


tombentley commented on code in PR #12347:
URL: https://github.com/apache/kafka/pull/12347#discussion_r916044426


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -307,6 +313,22 @@ class LogManager(logDirs: Seq[File],
 log
   }
 
+  import java.util.concurrent.ThreadFactory
+
+  // factory class for naming the log recovery threads used in metrics
+  class LogRecoveryThreadFactory(val baseName: String) extends ThreadFactory {
+val threadsNum = new AtomicInteger(0)

Review Comment:
   Singlular `threadNum`?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File],
 error(s"There was an error in one of the threads during logs loading: 
${e.getCause}")
 throw e.getCause
 } finally {
+  removeLogRecoveryMetrics()
   threadPools.foreach(_.shutdown())
 }
 
 info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
   }
 
+  private[log] def addLogRecoveryMetrics(): Unit = {
+for (dir <- logDirs) {
+  newGauge("remainingLogsToRecover", () => 
numRemainingLogs.get(dir.getAbsolutePath),
+Map("dir" -> dir.getAbsolutePath))
+  for (i <- 0 until numRecoveryThreadsPerDataDir) {
+val threadName = s"log-recovery-${dir.getAbsolutePath}-$i"
+newGauge("remainingSegmentsToRecover", () => 
numRemainingSegments.get(threadName),
+  Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString))
+  }
+}
+  }
+
+  private[log] def removeLogRecoveryMetrics(): Unit = {
+for (dir <- logDirs) {
+  removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath))
+  for (i <- 0 until numRecoveryThreadsPerDataDir) {

Review Comment:
   `numRecoveryThreadsPerDataDir` can be changed, so what happens if it changes 
after the metrics are added and before they're removed?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File],
 error(s"There was an error in one of the threads during logs loading: 
${e.getCause}")
 throw e.getCause
 } finally {
+  removeLogRecoveryMetrics()
   threadPools.foreach(_.shutdown())
 }
 
 info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
   }
 
+  private[log] def addLogRecoveryMetrics(): Unit = {
+for (dir <- logDirs) {
+  newGauge("remainingLogsToRecover", () => 
numRemainingLogs.get(dir.getAbsolutePath),
+Map("dir" -> dir.getAbsolutePath))
+  for (i <- 0 until numRecoveryThreadsPerDataDir) {
+val threadName = s"log-recovery-${dir.getAbsolutePath}-$i"

Review Comment:
   Can we encapsulate this in a method, since it's duplicating the logic in the 
ThreadFactory?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tombentley commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-07 Thread GitBox


tombentley commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r916026938


##
core/src/main/scala/kafka/log/LogCleaner.scala:
##
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-* Reconfigure log clean config. This simply stops current log cleaners and 
creates new ones.
+* Reconfigure log clean config. It will (1) update desiredRatePerSec in 
Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stop current log 
cleaners and create new ones.

Review Comment:
   ```suggestion
   * Reconfigure log clean config. The will:
   * 1. update desiredRatePerSec in Throttler with 
logCleanerIoMaxBytesPerSecond, if necessary 
   * 2. stop current log cleaners and create new ones.
   ```



##
core/src/main/scala/kafka/utils/Throttler.scala:
##
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   That seems reasonable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol

2022-07-07 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563831#comment-17563831
 ] 

Travis Bischel commented on KAFKA-14048:


>From a technical perspective, this is a very interesting and compelling 
>proposal.

Can the KIP list be updated to include this? I've noticed the KIP list has not 
been updated much and there are a good few KIPs missing. Also, there is no 
public discussion link: has there been any public discussion on this proposal?

> The Next Generation of the Consumer Rebalance Protocol
> --
>
> Key: KAFKA-14048
> URL: https://issues.apache.org/jira/browse/KAFKA-14048
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> This Jira tracks the development of KIP-848: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-07 Thread GitBox


jsancio commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r915992223


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -117,26 +117,32 @@ class BrokerMetadataListener(
   } finally {
 reader.close()
   }
-  _publisher.foreach(publish)
 
-  // If we detected a change in metadata.version, generate a local snapshot
-  val metadataVersionChanged = Option(_delta.featuresDelta()).exists { 
featuresDelta =>
-featuresDelta.metadataVersionChange().isPresent
+  _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+  if (shouldSnapshot()) {
+maybeStartSnapshot()
   }
 
-  snapshotter.foreach { snapshotter =>
-_bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-if (shouldSnapshot() || metadataVersionChanged) {
-  if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply())) {
-_bytesSinceLastSnapshot = 0L
-  }
-}
-  }
+  _publisher.foreach(publish)
 }
   }
 
   private def shouldSnapshot(): Boolean = {
-_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || 
metadataVersionChanged()
+  }
+
+  private def metadataVersionChanged(): Boolean = {
+_publisher.nonEmpty && Option(_delta.featuresDelta()).exists { 
featuresDelta =>

Review Comment:
   Do you mind writing a comment as to why we check that the publisher is set. 
If I understand this correctly it is not a correctness issue but a performance 
issue, right? If I remember correctly, @mumrah mentioned that he wanted to 
generate a snapshot whenever the metadata version changes. Unfortunately, I 
couldn't find a mention of this in KIP-778.
   
   With this change this is no longer true. What do you think @mumrah ?



##
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##
@@ -240,6 +239,42 @@ class BrokerMetadataListenerTest {
 }
   }
 
+  @Test
+  def testNotSnapshotAfterMetadataVersionChangeBeforePublishing(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+listener.getImageRecords().get()
+assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate 
snapshot on metadata version change before starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChangeWhenStarting(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+
+val endOffset = 100L
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), endOffset)
+listener.startPublishing(new MockMetadataPublisher()).get()
+assertEquals(endOffset, snapshotter.activeSnapshotOffset, "We should try 
to generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {
+val snapshotter = new MockMetadataSnapshotter()
+val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+  maxBytesBetweenSnapshots = 1000L)
+listener.startPublishing(new MockMetadataPublisher()).get()
+
+val endOffset = 100L
+updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
(MetadataVersion.latest().featureLevel() - 1).toShort, endOffset)
+listener.getImageRecords().get()

Review Comment:
   Let's write a comment saying that this `get` is waiting for the metadata 
version update above to get processed.



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -51,20 +51,26 @@ class BrokerMetadataSnapshotter(
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
   override def maybeStartSnapshot(lastContainedLogTime: Long, image: 
MetadataImage): Boolean = synchronized {
-if (_currentSnapshotOffset == -1L) {
+if (_currentSnapshotOffset != -1) {
+  info(s"Declining to create a new snapshot at 
${image.highestOffsetAndEpoch()} because " +
+s"there is already a snapshot in progress at offset 
${_currentSnapshotOffset}")

Review Comment:
   Nit, the curly brackets ({}) are not needed for `_currentSnapshotOffset`. 
This comment applies to a few places.



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -109,4 +115,9 @@ class BrokerMetadataSnapshotter(
 beginShutdown()
 eventQueue.close()
   }
+
+  // VisibleForTesting
+  def currentSnapshotOffset(): Long = {
+_currentSnapshotOffset
+  }

Review Comment:
   It doesn't look like you use th

[jira] [Created] (KAFKA-14054) Unexpected client shutdown as TimeoutException is thrown as IllegalStateException

2022-07-07 Thread Donald (Jira)
Donald created KAFKA-14054:
--

 Summary: Unexpected client shutdown as TimeoutException is thrown 
as IllegalStateException
 Key: KAFKA-14054
 URL: https://issues.apache.org/jira/browse/KAFKA-14054
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.1.1, 3.2.0, 3.1.0
Reporter: Donald


 Re: 
https://forum.confluent.io/t/bug-timeoutexception-is-thrown-as-illegalstateexception-causing-client-shutdown/5460/2

1) TimeoutException is thrown as IllegalStateException in 
{_}org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded{_}. 
Which causes the client to shutdown in 
{_}org.apache.kafka.streams.KafkaStreams#getActionForThrowable{_}.

2) Should Timeout be a recoverable error which is expected to be handled by 
User?

3) This issue is exposed by change KAFKA-12887 which was introduced in 
kafka-streams ver 3.1.0

*code referenced*
{code:java|title=org.apache.kafka.streams.processor.internals.StreamTask#commitNeeded}
public boolean commitNeeded() {
if (commitNeeded) {
return true;
} else {
for (final Map.Entry entry : 
consumedOffsets.entrySet()) {
final TopicPartition partition = entry.getKey();
try {
final long offset = mainConsumer.position(partition);
if (offset > entry.getValue() + 1) {
commitNeeded = true;
entry.setValue(offset - 1);
}
} catch (final TimeoutException error) {
// the `consumer.position()` call should never block, 
because we know that we did process data
// for the requested partition and thus the consumer should 
have a valid local position
// that it can return immediately

// hence, a `TimeoutException` indicates a bug and thus we 
rethrow it as fatal `IllegalStateException`
throw new IllegalStateException(error);
} catch (final KafkaException fatal) {
throw new StreamsException(fatal);
}
}

return commitNeeded;
}
}
{code}


{code:java|title=org.apache.kafka.streams.KafkaStreams#getActionForThrowable}
private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
getActionForThrowable(final Throwable throwable,

final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action;
if (wrappedExceptionIsIn(throwable, 
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
action = SHUTDOWN_CLIENT;
} else {
action = streamsUncaughtExceptionHandler.handle(throwable);
}
return action;
}

private void handleStreamsUncaughtException(final Throwable throwable,
final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean 
skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well as 
the deprecated old handler." +
"The old handler will be ignored as long as a new handler 
is set.");
}
switch (action) {
case REPLACE_THREAD:
if (!skipThreadReplacement) {
log.error("Replacing thread in the streams uncaught 
exception handler", throwable);
replaceStreamThread(throwable);
} else {
log.debug("Skipping thread replacement for recoverable 
error");
}
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during 
processing " +
"and Kafka Streams opted to " + action + "." +
" The streams client is going to shut down now. ", 
throwable);
closeToError();
break;
{code}

 *Stacktrace*

{code:java|title=error log kafka-streams v. 3.1.0}
2022-06-22 13:58:35,796 ERROR 
thread=[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d-StreamThread-1]
 logger=o.a.k.s.KafkaStreams - stream-client 
[com_stmartin_hammer_v3_platform-hammer-facade-fdc90fab-ed3a-4e62-b458-e73f80e6975d]
 Encountered the following exception during processing and Kafka Streams opted 
to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: 
java.lang.Illega

[GitHub] [kafka] dajac commented on pull request #12385: MINOR: Expose client information on RequestContext as additional public API beyond request logs (continuation of KIP 511)

2022-07-07 Thread GitBox


dajac commented on PR #12385:
URL: https://github.com/apache/kafka/pull/12385#issuecomment-1177681675

   @methodmissing Thanks for the PR. Could you elaborate a little more on the 
motivation for this change? Also, don't we need to extend the 
`AuthorizableRequestContext` if you want to get the client information in the 
plugins? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] singhnama commented on a diff in pull request #12359: KAFKA-13983: Fail the creation with "/" in resource name in zk ACL

2022-07-07 Thread GitBox


singhnama commented on code in PR #12359:
URL: https://github.com/apache/kafka/pull/12359#discussion_r915906279


##
core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala:
##
@@ -209,6 +213,9 @@ class AclAuthorizer extends Authorizer with Logging {
   throw new UnsupportedVersionException(s"Adding ACLs on prefixed 
resource patterns requires " +
 s"${KafkaConfig.InterBrokerProtocolVersionProp} of $IBP_2_0_IV1 or 
greater")
 }
+if (inValidAclBindingResourceName(aclBinding.pattern().name())) {

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-07 Thread GitBox


showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915850442


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
 val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000: 
java.lang.Double)

Review Comment:
   nit: I think we can just put `1000L` here.



##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1856,7 +1856,7 @@ class LogCleanerTest {
   @Test
   def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
 val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
-oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000: 
java.lang.Double)

Review Comment:
   nit: I think we can just put `1000L` here for long value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] tyamashi-oss commented on a diff in pull request #12296: KAFKA-13996: log.cleaner.io.max.bytes.per.second can be changed dynamically

2022-07-07 Thread GitBox


tyamashi-oss commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r915753889


##
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##
@@ -1854,6 +1853,33 @@ class LogCleanerTest {
 } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 1000)
+
+val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+  logDirs = Array(TestUtils.tempDir()),
+  logs = new Pool[TopicPartition, UnifiedLog](),
+  logDirFailureChannel = new LogDirFailureChannel(1),
+  time = time) {
+  // shutdown() and startup() are called in LogCleaner.reconfigure().
+  // Empty startup() and shutdown() to ensure that no unnecessary log 
cleaner threads remain after this test.
+  override def startup(): Unit = {}
+  override def shutdown(): Unit = {}
+}
+
+try {
+  assertEquals(1000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be initialized from initial 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+
+  val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+  newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
2000)
+
+  logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))
+
+  assertEquals(2000, logCleaner.throttler.desiredRatePerSec, 
s"Throttler.desiredRatePerSec should be updated with new 
`${KafkaConfig.LogCleanerIoMaxBytesPerSecondProp}` config.")
+} finally logCleaner.shutdown();

Review Comment:
   I've fixed it. Thank you for pointing out. 
https://github.com/apache/kafka/pull/12296/commits/5c727409cc03a8b459f6d30b8df888489f52d743
   With this fix, the java8/scala2.12 build now succeeds.
   
   However, the `JDK 11 / Scala 2.13` build, which used to succeed, now failed. 
   I believe this new build failure is unrelated to my changes. I tried running 
the same command in my local environment once and this build was successful.
   ```
   [2022-07-07T06:22:53.589Z] > Task :streams:integrationTest FAILED
   
   [2022-07-07T06:22:53.589Z] 
   
   [2022-07-07T06:22:53.589Z] FAILURE: Build failed with an exception.
   ```
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-12296/7/pipeline/



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pch8388 opened a new pull request, #12389: MINOR: Fix result string

2022-07-07 Thread GitBox


pch8388 opened a new pull request, #12389:
URL: https://github.com/apache/kafka/pull/12389

   Removes duplicate result strings and avoids mistakes when changing the 
string format
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-07 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban updated KAFKA-14053:
-
Description: 
When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.

Because of this, the producer should skip aborting the partition, and bump the 
epoch to fence the in-flight requests.

  was:
When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.


> Transactional producer should bump the epoch when a batch encounters delivery 
> timeout
> -
>
> Key: KAFKA-14053
> URL: https://issues.apache.org/jira/browse/KAFKA-14053
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> When a batch fails due to delivery timeout, it is possible that the batch is 
> still in-flight. Due to underlying infra issues, it is possible that an 
> EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
> batch is processed on the leader. This can cause transactional batches to be 
> appended to the log after the corresponding abort marker.
> This can cause the LSO to be infinitely blocked in the partition, or can even 
> violate processing guarantees, as the out-of-order batch can become part of 
> the next transaction.
> Because of this, the producer should skip aborting the partition, and bump 
> the epoch to fence the in-flight requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-07 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14053:


 Summary: Transactional producer should bump the epoch when a batch 
encounters delivery timeout
 Key: KAFKA-14053
 URL: https://issues.apache.org/jira/browse/KAFKA-14053
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban
Assignee: Daniel Urban


When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] omkreddy commented on a diff in pull request #12359: KAFKA-13983: Fail the creation with "/" in resource name in zk ACL

2022-07-07 Thread GitBox


omkreddy commented on code in PR #12359:
URL: https://github.com/apache/kafka/pull/12359#discussion_r915675978


##
core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala:
##
@@ -209,6 +213,9 @@ class AclAuthorizer extends Authorizer with Logging {
   throw new UnsupportedVersionException(s"Adding ACLs on prefixed 
resource patterns requires " +
 s"${KafkaConfig.InterBrokerProtocolVersionProp} of $IBP_2_0_IV1 or 
greater")
 }
+if (inValidAclBindingResourceName(aclBinding.pattern().name())) {

Review Comment:
   Can we this to `validateAclBinding` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection

2022-07-07 Thread GitBox


divijvaidya commented on PR #12381:
URL: https://github.com/apache/kafka/pull/12381#issuecomment-1177319338

   > Sorry, looks like the `match` case still needs default case:
   > 
   > ```
   > [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-12381/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scalaː190ː7:
 match may not be exhaustive.
   > ```
   
   My bad Luke! I did a hasty change yesterday without verifying the build. I 
have fixed it now.
   
   I appreciate your quick responses on this PR. If you get time please take a 
look at my other open PRs as well: 
https://github.com/apache/kafka/pulls/divijvaidya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-07 Thread GitBox


elkkhan commented on PR #12320:
URL: https://github.com/apache/kafka/pull/12320#issuecomment-1177283596

   @C0urante thanks, addressed the comments. hopefully checkstyle is all good 
now - I wasn't able to run the gradle task locally for some reason, it kept 
getting stuck, but now it's working and reporting no issues


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-07 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915638083


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static SecretKey getMockSecretKey() {
+SecretKey mockKey = mock(SecretKey.class);
+when(mockKey.getFormat()).thenReturn("RAW");// supported format by
+
when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+return mockKey;
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint";,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+MOCK_SECRET_KEY,
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacSHA1";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private final HttpClient httpClient = mock(HttpClient.class);
+
+@Parameterized.Parameter
+public Throwable requestException;
+
+@Parameterized.Parameters
+public static Collection requestExceptions() {
+return Arrays.asList(new Object[][]{
+{new InterruptedException()},
+{new ExecutionException(null)},
+{new TimeoutException()}
+});
+}
+
+private static Request buildThrowingMockRequest(Throwable t) throws 
ExecutionException, InterruptedException, TimeoutException {
+Request req = mock(Request.class);
+when(req.send()).thenThrow(t);

Review Comment:
   nice catch, thx.  setupHttpClient does this but I forgot to do it here and 
didn't notice the NPEs.
   fixed



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-07 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915637537


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static SecretKey getMockSecretKey() {
+SecretKey mockKey = mock(SecretKey.class);
+when(mockKey.getFormat()).thenReturn("RAW");// supported format by
+
when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+return mockKey;
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint";,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+MOCK_SECRET_KEY,
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacSHA1";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private final HttpClient httpClient = mock(HttpClient.class);
+
+@Parameterized.Parameter
+public Throwable requestException;
+
+@Parameterized.Parameters
+public static Collection requestExceptions() {
+return Arrays.asList(new Object[][]{
+{new InterruptedException()},
+{new ExecutionException(null)},
+{new TimeoutException()}
+});
+}
+
+private static Request buildThrowingMockRequest(Throwable t) throws 
ExecutionException, InterruptedException, TimeoutException {
+Request req = mock(Request.class);
+when(req.send()).thenThrow(t);
+return req;
+}
+
+@Test
+public void testFailureDuringRequestCausesInternalServerError() throw

[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-07 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915637006


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static SecretKey getMockSecretKey() {
+SecretKey mockKey = mock(SecretKey.class);
+when(mockKey.getFormat()).thenReturn("RAW");// supported format by
+
when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+return mockKey;
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint";,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+MOCK_SECRET_KEY,
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacSHA1";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private final HttpClient httpClient = mock(HttpClient.class);

Review Comment:
   nice, thanks - didn't know this. fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] elkkhan commented on a diff in pull request #12320: KAFKA-13702: Connect RestClient overrides response status code on request failure

2022-07-07 Thread GitBox


elkkhan commented on code in PR #12320:
URL: https://github.com/apache/kafka/pull/12320#discussion_r915628889


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java:
##
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.client.api.ContentResponse;
+import org.eclipse.jetty.client.api.Request;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.crypto.SecretKey;
+import javax.ws.rs.core.Response;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Enclosed.class)
+public class RestClientTest {
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+private static final TypeReference TEST_TYPE = new 
TypeReference() {
+};
+private static final SecretKey MOCK_SECRET_KEY = getMockSecretKey();
+
+private static void assertIsInternalServerError(ConnectRestException e) {
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.statusCode());
+assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
e.errorCode());
+}
+
+private static SecretKey getMockSecretKey() {
+SecretKey mockKey = mock(SecretKey.class);
+when(mockKey.getFormat()).thenReturn("RAW");// supported format by
+
when(mockKey.getEncoded()).thenReturn("SomeKey".getBytes(StandardCharsets.UTF_8));
+return mockKey;
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient, String requestSignatureAlgorithm) {
+return RestClient.httpRequest(
+httpClient,
+"https://localhost:1234/api/endpoint";,
+"GET",
+null,
+new TestDTO("requestBodyData"),
+TEST_TYPE,
+MOCK_SECRET_KEY,
+requestSignatureAlgorithm);
+}
+
+private static RestClient.HttpResponse httpRequest(HttpClient 
httpClient) {
+String validRequestSignatureAlgorithm = "HmacSHA1";
+return httpRequest(httpClient, validRequestSignatureAlgorithm);
+}
+
+
+@RunWith(Parameterized.class)
+public static class RequestFailureParameterizedTest {
+private final HttpClient httpClient = mock(HttpClient.class);
+
+@Parameterized.Parameter
+public Throwable requestException;
+
+@Parameterized.Parameters
+public static Collection requestExceptions() {
+return Arrays.asList(new Object[][]{
+{new InterruptedException()},
+{new ExecutionException(null)},
+{new TimeoutException()}
+});
+}
+
+private static Request buildThrowingMockRequest(Throwable t) throws 
ExecutionException, InterruptedException, TimeoutException {
+Request req = mock(Request.class);
+when(req.send()).thenThrow(t);

Review Comment:
   @C0urante I believe this is already in the latest commit, any chance you 
were referring to a the previous comm

[jira] [Commented] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation

2022-07-07 Thread Hang HOU (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563626#comment-17563626
 ] 

Hang HOU commented on KAFKA-6221:
-

I met this problem again in kafka 0.10.0. one last thing is if I create some 
topics, and each with only one partition,then I send 3 million records to every 
topic,some topic never loss data when produce but the rest topics absolutely 
occur loss about 1/7 total data:(

> ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic 
> creation 
> ---
>
> Key: KAFKA-6221
> URL: https://issues.apache.org/jira/browse/KAFKA-6221
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0
> Environment: RHEL 7
>Reporter: Alex Dunayevsky
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This issue appeared to happen frequently on 0.10.2.0. 
> On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. 
> We'll focus on reproducing it on 0.10.2.1 and 1.0.0.
> *TOPOLOGY:* 
>   3 brokers, 1 zk.
> *REPRODUCING STRATEGY:* 
> Create a few dozens topics (say, 40) one by one, each with replication factor 
> 2. Number of partitions, generally, does not matter but, for easier 
> reproduction, should not be too small (around 30 or so). 
> *CREATE 40 TOPICS:*
> {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic 
> "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; 
> done {code}
> *ERRORS*
> {code:java}
> *BROKER 1*
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> *BROKER 2*
> [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for 
> partition [topic20_p28_r2,12] to broker 
> 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for 
> partition [topic20_p28_r2,12] to broker 
> 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. 

[GitHub] [kafka] showuon commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection

2022-07-07 Thread GitBox


showuon commented on PR #12381:
URL: https://github.com/apache/kafka/pull/12381#issuecomment-1177230649

   Sorry, looks like the `match` case still needs default case:
   ```
   [Error] 
/home/jenkins/workspace/Kafka_kafka-pr_PR-12381/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scalaː190ː7:
 match may not be exhaustive.
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-07 Thread GitBox


dengziming commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1177222867

   > Do you have a diff for this solution?
   @jsancio  I havn't flesh it, the basic idea is shown in this PR, only read 
up to `Isolation.COMMITTED` when reading from observer and invoke 
`fetchPurgatory.maybeComplete(highWatermark.offset, currentTimeMs)` on 
`onUpdateLeaderHighWatermark`.
   
   
   
   > maybe we can try the original proposed one?
   @showuon This solution can only take effect if the problem is that the 
heartbeat period is much bigger than NoOpRecord, it may not work now since main 
the problem is from `RaftClient.fetchPurgatory`, so we should try a new 
solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on pull request #12175: KIP-840: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer

2022-07-07 Thread GitBox


zigarn commented on PR #12175:
URL: https://github.com/apache/kafka/pull/12175#issuecomment-1177208913

   @dajac KIP is now accepted. Next step is to validate this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] zigarn commented on a diff in pull request #12175: KIP-840: Config file option for MessageReader/MessageFormatter in ConsoleProducer/ConsoleConsumer

2022-07-07 Thread GitBox


zigarn commented on code in PR #12175:
URL: https://github.com/apache/kafka/pull/12175#discussion_r915565817


##
core/src/main/scala/kafka/tools/ConsoleConsumer.scala:
##
@@ -248,6 +248,10 @@ object ConsoleConsumer extends Logging {
   .withRequiredArg
   .describedAs("prop")
   .ofType(classOf[String])
+val messageFormatterConfigOpt = parser.accepts("config", s"Config 
properties file to initialize the message formatter. Note that 
$messageFormatterArgOpt takes precedence over this config.")

Review Comment:
   `--consumer.config` is to setup the consumer, this is to setup the message 
formatter.
   It the same difference as `--consumer-property` and `--property`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] etolbakov opened a new pull request, #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire

2022-07-07 Thread GitBox


etolbakov opened a new pull request, #12388:
URL: https://github.com/apache/kafka/pull/12388

   Hello David @dajac ,
   I found an open JIRA ticket - 
https://issues.apache.org/jira/browse/KAFKA-14013
   and decided to come up with a suggestion.
   Could you please take a look if you have a spare minute?
   Happy to adjust the changes if that's required 
   
   --
   Regards, Eugene 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac merged pull request #12382: MINOR: kafka system tests should support larger EBS volumes for newer instances

2022-07-07 Thread GitBox


dajac merged PR #12382:
URL: https://github.com/apache/kafka/pull/12382


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12265: KAFKA-13968: Fix 3 major bugs of KRaft snapshot generating

2022-07-07 Thread GitBox


dengziming commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r915532837


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala:
##
@@ -45,26 +45,37 @@ class BrokerMetadataSnapshotter(
*/
   private var _currentSnapshotOffset = -1L
 
+  /**
+   * The offset of the newest snapshot, or -1 if there hasn't been one. 
Accessed only under
+   * the object lock.
+   */
+  private var _latestSnapshotOffset = -1L

Review Comment:
   This seems an existing problem, I checked the logic of 
`QuorumController.SnapshotGeneratorManager`, it will check the result of 
`raftClient.createSnapshot` and just skip it if it's empty, however, at broker 
side, `BrokerSnapshotWriterBuilder` will throw an exception on empty snapshot. 
how about making it consistent with controller code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org