[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1192206399

   I don't know, but I can have a look in the upcoming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-21 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1192143782

   @jsancio , sorry, I just found it's been passed the feature freeze date. But 
I think this PR is only adding metrics in log manager, it should fine to 
backport to 3.3. But let me know if you disagree that. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-21 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1192133846

   Backport to 3.3 branch. cc @jsancio 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop

2022-07-21 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13917:
--
Fix Version/s: 3.2.1

> Avoid calling lookupCoordinator() in tight loop
> ---
>
> Key: KAFKA-13917
> URL: https://issues.apache.org/jira/browse/KAFKA-13917
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.1.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
> Fix For: 3.3.0, 3.2.1
>
>
> Currently the heartbeat thread's lookupCoordinator() is called in a tight 
> loop if brokers crash and the consumer is left running. Besides that it 
> floods the logs on debug level, it increases CPU usage as well.
> The fix is easy, just need to put a backoff call after coordinator lookup.
> Reproduction:
> # Start a few brokers
> # Create a topic and produce to it
> # Start consuming
> # Stop all brokers
> At this point lookupCoordinator() will be called in a tight loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-21 Thread GitBox


showuon merged PR #12347:
URL: https://github.com/apache/kafka/pull/12347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics

2022-07-21 Thread GitBox


showuon commented on PR #12347:
URL: https://github.com/apache/kafka/pull/12347#issuecomment-1192128184

   Failed tests are unrelated and also failed in trunk build:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12417: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (#12180)

2022-07-21 Thread GitBox


showuon commented on PR #12417:
URL: https://github.com/apache/kafka/pull/12417#issuecomment-1192099004

   Sorry, I didn't notice it until now. Thanks for help review and merge, 
@mumrah !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-21 Thread GitBox


artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1192047420

   Hi @etolbakov, making this method private sounds reasonable to me.  Thank 
you for suggestion.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #12417: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (#12180)

2022-07-21 Thread GitBox


mumrah merged PR #12417:
URL: https://github.com/apache/kafka/pull/12417


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #12417: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop (#12180)

2022-07-21 Thread GitBox


mumrah commented on PR #12417:
URL: https://github.com/apache/kafka/pull/12417#issuecomment-1192043608

   Looks like the test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12396: KAFKA-14051: Create metrics reporters in KRaft remote controllers

2022-07-21 Thread GitBox


cmccabe merged PR #12396:
URL: https://github.com/apache/kafka/pull/12396


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

2022-07-21 Thread GitBox


lihaosky commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r927167240


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords(
 if (internalProcessorContext.currentSystemTimeMs() < 
sharedTimeTracker.nextTimeToEmit) {
 return;
 }
-if (sharedTimeTracker.nextTimeToEmit == 0) {
-sharedTimeTracker.nextTimeToEmit = 
internalProcessorContext.currentSystemTimeMs();
-}
+
+// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, 
if we dont set it everytime,
+// they can get out of sync during a clock drift
+sharedTimeTracker.nextTimeToEmit = 
internalProcessorContext.currentSystemTimeMs();

Review Comment:
   I'm ok with the comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lihaosky commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

2022-07-21 Thread GitBox


lihaosky commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r927166898


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
 runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), 
joinWindows);
 }
 
+@Test
+public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   Hi @qingwei91 , thanks for fixing and great test coverage! Regarding test 
complexity, can you do something similar as 
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java#L768
 to test time drift. Instead of mocking low level stores, can you check the 
final results?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-21 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569715#comment-17569715
 ] 

Chris Egerton commented on KAFKA-14089:
---

Thanks Mickael. I put together a draft fix 
[here|https://github.com/apache/kafka/pull/12429], although I still haven't 
been able to replicate the failure locally. If you have time, would you mind 
giving it a try and see if it has positive effects in your environment? I can 
also kick off several Jenkins builds by re-triggering CI runs, although that 
will be more time-consuming as it will run the build for the whole project 
instead of just Connect.

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante opened a new pull request, #12429: KAFKA-14089: Only check for committed seqnos after disabling exactly-once support in Connect integration test

2022-07-21 Thread GitBox


C0urante opened a new pull request, #12429:
URL: https://github.com/apache/kafka/pull/12429

   [Jira](https://issues.apache.org/jira/browse/KAFKA-14089)
   
   This is a potential fix for the flakiness in the 
`ExactlyOnceSourceIntegrationTest::testSeparateOffsetsTopic` test. There's also 
a few minor improvements to prevent unnecessary `ERROR`-level log messages for 
shutdown of cancelled exactly-once source tasks, and unnecessary `WARN`-level 
log messages when creating exactly-once source task producers.
   
   This fix should make the test resilient to unclean task and worker shutdown 
by (as the title indicates) only verifying data emitted by the source tasks up 
to the latest-committed offset; data after that point may exist in the topic 
written to by the task, but does not have to be accurate in order to retain the 
at-least-once delivery guarantees provided when exactly-once support is 
disabled.
   
   Full disclosure: I haven't encountered any failures while running this 
locally, but I also haven't been able to replicate the failures described in 
the ticket, either.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually 
`clearCompletedSends()` 
([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method 
(`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) 
and call it from `SocketServer.processCompletedSends()`.
   
   But these alternatives will require looping over all the channels, which 
will not be efficient.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually 
`clearCompletedSends()` 
([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method 
(`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) 
and call it from `SocketServer.processCompletedSends()`.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927148782


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   The `Selector` method getting called after channel is unmuted is actually 
`clearCompletedSends()` 
([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L825-L827)).
   
   Few alternatives I can think of:
   
   1. Move `madeReadProgressLastPoll=true` to this method 
(`Selector.clearCompletedSends()`).
   2. Create a new `Selector` method named `readyForNextRead()` (or something) 
and call it from `SocketServer.processCompletedSends()`.
   3. Not setting `madeReadProgressLastPoll=false` if 
`channel.hasBytesBuffered()` is true in `clear()` 
([here](https://github.com/apache/kafka/blob/a6c036256504b2549b009c10c777927a40a4bc34/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L862)).
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123903


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms
+// set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+  testableSelector.wakeup()
+  val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+  processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+  // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+  // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+  val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+  val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+  

[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123562


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms
+// set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+  testableSelector.wakeup()
+  val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+  processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+  // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+  // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+  val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+  val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)

Review Comment:
   

[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927123679


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// create server with SSL listener
+val testableServer = new 
TestableSocketServer(KafkaConfig.fromProps(sslServerProps))
+testableServer.enableRequestProcessing(Map.empty)
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+val selectTimeout = 5000  // in ms
+// set pollTimeoutOverride to "selectTimeout" to ensure poll() timeout is 
distinct and can be identified
+testableSelector.pollTimeoutOverride = Some(selectTimeout)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  // we call wakeup() so Selector.poll() does not block in this step 
(because we artificially add data into netReadBuffer)
+  testableSelector.wakeup()
+  val req1 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+  processRequest(testableServer.dataPlaneRequestChannel, req1)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE 1: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+  // this step will block for 300 ms (in this test, we override poll() 
timeout to 5000 ms to make it distinct)
+  // NOTE 2: we do not call wakeup() here so Selector.poll() would block 
if the fix is not in place
+  val processTimeStart = System.nanoTime()  // using nanoTime() because it 
is meant to calculate elapsed time
+  val req2 = receiveRequest(testableServer.dataPlaneRequestChannel, 
selectTimeout + 1000)
+  

[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927098320


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,98 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.

Review Comment:
   Got it. Changing "300 ms" to "poll timeout".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927095680


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   How about saying this in the comment:
   
   
   > After reading from the channel, "madeReadProgressLastPoll" was set to 
"true". However, because channel is muted while processing the previous 
request, "madeReadProgressLastPoll" was set to "false" by "clear()". Now that a 
response has been sent and channel is unmuted, we need to set 
"madeReadProgressLastPoll" back to "true" if there is additional data in the 
buffer, to ensure we process it immediately.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-21 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r927095680


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -591,6 +591,11 @@ void pollSelectionKeys(Set selectionKeys,
 long nowNanos = channelStartTimeNanos != 0 ? 
channelStartTimeNanos : currentTimeNanos;
 try {
 attemptWrite(key, channel, nowNanos);
+
+// Following is to fix KAFKA-13559. This will prevent 
poll() from blocking for 300 ms when the
+// socket has no data but the buffer has data. Only 
happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @splett2 
   
   How about saying this in the comment:
   
   ```
   After reading from the channel, "madeReadProgressLastPoll" was set to 
"true". However, because channel is muted while processing the previous 
request, "madeReadProgressLastPoll" was set to "false" by "clear()". Now that a 
response has been sent and channel is unmuted, we need to set 
"madeReadProgressLastPoll" back to "true" if there is additional data in the 
buffer, to ensure we process it immediately.
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14097) Separate configuration for producer ID expiry

2022-07-21 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-14097:
--

 Summary:  Separate configuration for producer ID expiry
 Key: KAFKA-14097
 URL: https://issues.apache.org/jira/browse/KAFKA-14097
 Project: Kafka
  Issue Type: Improvement
Reporter: Justine Olshan


Ticket to track KIP-854. Currently time-based producer ID expiration is 
controlled by `transactional.id.expiration.ms` but we want to create a separate 
config. This can give us finer control over memory usage – especially since 
producer IDs will be more common with idempotency becoming the default.


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] akatona84 commented on pull request #11565: KAFKA-13504: Retry connect internal topics' creation in case of InvalidReplicationFactorException

2022-07-21 Thread GitBox


akatona84 commented on PR #11565:
URL: https://github.com/apache/kafka/pull/11565#issuecomment-1191893077

   @urbandan , @lhunyady I've reworked the change, now it has the timeout 
exception and timeout based retries (instead of retry counts).
   Could you guys take a look please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-21 Thread GitBox


guozhangwang merged PR #12427:
URL: https://github.com/apache/kafka/pull/12427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-21 Thread GitBox


guozhangwang commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r927038139


##
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
 return Arrays.stream(Thread.currentThread().getStackTrace())
 .anyMatch(caller -> 
"org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && 
"checkSupplier".equals(caller.getMethodName()));
 }
+
+public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   nit: we have other unit test classes duplicating this logic, e.g. in the 
`DefaultStateUpdaterTest` above, also in `StreamTaskTest#createStatelessTask`. 
Could we consolidate them all in this class?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map> activeTask
 
 if (!activeTasksToCreate.isEmpty()) {
 for (final Task activeTask : 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-activeTasksPerId.put(activeTask.id(), activeTask);
-pendingActiveTasks.remove(activeTask.id());
-for (final TopicPartition topicPartition : 
activeTask.inputPartitions()) {
-activeTasksPerPartition.put(topicPartition, activeTask);
+if (stateUpdater != null) {

Review Comment:
   This is a meta thought: I think we should consider extracting the creation 
of tasks out of `Tasks` and into the `TaskManager`, and hence also not include 
`StateUpdater` into `Tasks`. Maybe we can do that later in a follow-up 
refactoring if you agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12428: fix flaky test test_standby_tasks_rebalance

2022-07-21 Thread GitBox


guozhangwang commented on PR #12428:
URL: https://github.com/apache/kafka/pull/12428#issuecomment-1191841287

   Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12428: fix flaky test test_standby_tasks_rebalance

2022-07-21 Thread GitBox


guozhangwang merged PR #12428:
URL: https://github.com/apache/kafka/pull/12428


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-21 Thread GitBox


guozhangwang commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r927019010


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -262,51 +294,30 @@ Collection tasks(final Collection taskIds) {
 
 // TODO: change return type to `StreamTask`
 Collection activeTasks() {
-return readOnlyActiveTasks;
-}
-
-Collection allTasks() {
-return readOnlyTasks;
-}
-
-Collection notPausedActiveTasks() {
-return new ArrayList<>(readOnlyActiveTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+return Collections.unmodifiableCollection(activeTasksPerId.values());
 }
 
-Collection notPausedTasks() {
-return new ArrayList<>(readOnlyTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+/**
+ * All tasks returned by any of the getters are read-only and should NOT 
be modified;
+ * and the returned task could be modified by other threads concurrently
+ */
+Set allTasks() {
+return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
 }
 
-Set activeTaskIds() {
-return readOnlyActiveTaskIds;
+Set allTaskIds() {
+return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
 }
 
-Set standbyTaskIds() {
-return readOnlyStandbyTaskIds;
-}
-
-// TODO: change return type to `StreamTask`
-Map activeTaskMap() {
-return readOnlyActiveTasksPerId;
-}
-
-// TODO: change return type to `StandbyTask`
-Map standbyTaskMap() {
-return readOnlyStandbyTasksPerId;
-}
-
-Map tasksPerId() {
-return readOnlyTasksPerId;
+Map allTasksPerId() {
+final Map ret = new HashMap<>();
+ret.putAll(activeTasksPerId);
+ret.putAll(standbyTasksPerId);
+return ret;

Review Comment:
   Sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12396: KAFKA-14051: Create metrics reporters in KRaft remote controllers

2022-07-21 Thread GitBox


cmccabe commented on PR #12396:
URL: https://github.com/apache/kafka/pull/12396#issuecomment-1191834126

   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12396: KAFKA-14051: Create metrics reporters in KRaft remote controllers

2022-07-21 Thread GitBox


cmccabe commented on PR #12396:
URL: https://github.com/apache/kafka/pull/12396#issuecomment-1191834062

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14096) Race Condition in Log Rolling Leading to Disk Failure

2022-07-21 Thread Eric Azama (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric Azama updated KAFKA-14096:
---
Description: 
We've recently encountered what appears to be a race condition that can lead to 
disk being marked offline. One of our brokers recently crashed because its log 
directory failed. We found the following in the server.log file
{code:java}
[2022-07-18 18:24:42,940] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Rolled new log segment at offset 141946850 in 37 ms. 
(kafka.log.Log)
[...]
[2022-07-18 18:24:47,782] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Scheduling segments for deletion 
List(LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678)) (kafka.log.Log)
[2022-07-18 18:24:48,024] ERROR Error while flushing log for TOPIC-REDACTED-15 
in dir /data1/kafka-logs with offset 141935201 
(kafka.server.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:176)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:472)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.log.LogSegment.flush(LogSegment.scala:471)
at kafka.log.Log.$anonfun$flush$4(Log.scala:1956)
at kafka.log.Log.$anonfun$flush$4$adapted(Log.scala:1955)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.log.Log.$anonfun$flush$2(Log.scala:1955)
at kafka.log.Log.flush(Log.scala:2322)
at kafka.log.Log.$anonfun$roll$9(Log.scala:1925)
at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-07-18 18:24:48,036] ERROR Uncaught exception in scheduled task 
'flush-log' (kafka.utils.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log 
for TOPIC-REDACTED-15 in dir /data1/kafka-logs with offset 141935201{code}
and the following in the log-cleaner.log file
{code:java}
[2022-07-18 18:24:47,062] INFO Cleaner 0: Cleaning 
LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678) in log 
TOPIC-REDACTED-15 into 141935201 with deletion horizon 1658082163480, retaining 
deletes. (kafka.log.LogCleaner) {code}
The timing of the log-cleaner log shows that the log flush failed because the 
log segment had been cleaned and the underlying file was already renamed or 
deleted.

The stacktrace indicates that the log flush that triggered the exception was 
part of the process of rolling a new log segment. (at 
kafka.log.Log.$anonfun$roll$9([Log.scala:1925|https://github.com/apache/kafka/blob/2.5.1/core/src/main/scala/kafka/log/Log.scala#L1925]))]
 This is somewhat concerning because this flush should have been scheduled with 
no delay, but the exception occurred about 6 second after the most recent roll, 
and it wasn't even the most recent file. 

Our best guess is that the broker's Scheduler was overloaded to a point that 
even 0-delay tasks were backed up in the queue, but we're not aware of any 
metrics that would allow us to monitor scheduler health.

While we encountered this on a somewhat old (2.5.1) version of the Broker, 
there don't seem to be any changes in trunk that would protect against this 
kind of delay

  was:
We've recently encountered what appears to be a race condition that can lead to 
disk being marked offline. One of our brokers recently crashed because its log 
directory failed. We found the following in the server.log file
{code:java}
[2022-07-18 18:24:42,940] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Rolled new log segment at offset 141946850 in 37 ms. 
(kafka.log.Log)
[...]
[2022-07-18 18:24:47,782] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Scheduling segments for deletion 
List(LogSegment(baseOffset=141935201, size=1073560219, 

[jira] [Updated] (KAFKA-14096) Race Condition in Log Rolling Leading to Disk Failure

2022-07-21 Thread Eric Azama (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eric Azama updated KAFKA-14096:
---
Description: 
We've recently encountered what appears to be a race condition that can lead to 
disk being marked offline. One of our brokers recently crashed because its log 
directory failed. We found the following in the server.log file
{code:java}
[2022-07-18 18:24:42,940] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Rolled new log segment at offset 141946850 in 37 ms. 
(kafka.log.Log)
[...]
[2022-07-18 18:24:47,782] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Scheduling segments for deletion 
List(LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678)) (kafka.log.Log)
[2022-07-18 18:24:48,024] ERROR Error while flushing log for TOPIC-REDACTED-15 
in dir /data1/kafka-logs with offset 141935201 
(kafka.server.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:176)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:472)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.log.LogSegment.flush(LogSegment.scala:471)
at kafka.log.Log.$anonfun$flush$4(Log.scala:1956)
at kafka.log.Log.$anonfun$flush$4$adapted(Log.scala:1955)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.log.Log.$anonfun$flush$2(Log.scala:1955)
at kafka.log.Log.flush(Log.scala:2322)
at kafka.log.Log.$anonfun$roll$9(Log.scala:1925)
at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-07-18 18:24:48,036] ERROR Uncaught exception in scheduled task 
'flush-log' (kafka.utils.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log 
for TOPIC-REDACTED-15 in dir /data1/kafka-logs with offset 141935201{code}
and the following in the log-cleaner.log file
{code:java}
[2022-07-18 18:24:47,062] INFO Cleaner 0: Cleaning 
LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678) in log 
TOPIC-REDACTED-15 into 141935201 with deletion horizon 1658082163480, retaining 
deletes. (kafka.log.LogCleaner) {code}
The timing of the log-cleaner log shows that the log flush failed because the 
log segment had been cleaned and the underlying file was already renamed or 
deleted.

The stacktrace indicates that the log flush that triggered the exception was 
part of the process of rolling a new log segment. (at 
kafka.log.Log.$anonfun$roll$9([Log.scala:1925|#L1925]))] This is somewhat 
concerning because this flush should have been scheduled with no delay, but the 
exception occurred about 6 second after the most recent roll, and it wasn't 
even the most recent file. 

Our best guess is that the broker's Scheduler was overloaded to a point that 
even 0-delay tasks were backed up in the queue, but we're not aware of any 
metrics that would allow us to monitor scheduler health.

While we encountered this on a somewhat old (2.5.1) version of the Broker, 
there don't seem to be any changes in trunk that would protect against this 
kind of delay

  was:
We've recently encountered what appears to be a race condition that can lead to 
disk being marked offline. One of our brokers recently crashed because its log 
directory failed. We found the following in the server.log file
{code:java}
[2022-07-18 18:24:42,940] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Rolled new log segment at offset 141946850 in 37 ms. 
(kafka.log.Log)
[...]
[2022-07-18 18:24:47,782] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Scheduling segments for deletion 
List(LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678)) (kafka.log.Log)
[2022-07-18 

[jira] [Created] (KAFKA-14096) Race Condition in Log Rolling Leading to Disk Failure

2022-07-21 Thread Eric Azama (Jira)
Eric Azama created KAFKA-14096:
--

 Summary: Race Condition in Log Rolling Leading to Disk Failure
 Key: KAFKA-14096
 URL: https://issues.apache.org/jira/browse/KAFKA-14096
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.1
Reporter: Eric Azama


We've recently encountered what appears to be a race condition that can lead to 
disk being marked offline. One of our brokers recently crashed because its log 
directory failed. We found the following in the server.log file
{code:java}
[2022-07-18 18:24:42,940] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Rolled new log segment at offset 141946850 in 37 ms. 
(kafka.log.Log)
[...]
[2022-07-18 18:24:47,782] INFO [Log partition=TOPIC-REDACTED-15, 
dir=/data1/kafka-logs] Scheduling segments for deletion 
List(LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678)) (kafka.log.Log)
[2022-07-18 18:24:48,024] ERROR Error while flushing log for TOPIC-REDACTED-15 
in dir /data1/kafka-logs with offset 141935201 
(kafka.server.LogDirFailureChannel)
java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:452)
at org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:176)
at kafka.log.LogSegment.$anonfun$flush$1(LogSegment.scala:472)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at kafka.log.LogSegment.flush(LogSegment.scala:471)
at kafka.log.Log.$anonfun$flush$4(Log.scala:1956)
at kafka.log.Log.$anonfun$flush$4$adapted(Log.scala:1955)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.log.Log.$anonfun$flush$2(Log.scala:1955)
at kafka.log.Log.flush(Log.scala:2322)
at kafka.log.Log.$anonfun$roll$9(Log.scala:1925)
at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-07-18 18:24:48,036] ERROR Uncaught exception in scheduled task 
'flush-log' (kafka.utils.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while flushing log 
for TOPIC-REDACTED-15 in dir /data1/kafka-logs with offset 141935201{code}
and the following in the log-cleaner.log file
{code:java}
[2022-07-18 18:24:47,062] INFO Cleaner 0: Cleaning 
LogSegment(baseOffset=141935201, size=1073560219, 
lastModifiedTime=1658168598869, largestTime=1658168495678) in log 
TOPIC-REDACTED-15 into 141935201 with deletion horizon 1658082163480, retaining 
deletes. (kafka.log.LogCleaner) {code}
 

The timing of the log-cleaner log shows that the log flush failed because the 
log segment had been cleaned and the underlying file was already renamed or 
deleted.


The stacktrace indicates that the log flush that triggered the exception was 
part of the process of rolling a new log segment. (at 
kafka.log.Log.$anonfun$roll$9([Log.scala:1925|[https://github.com/apache/kafka/blob/2.5.1/core/src/main/scala/kafka/log/Log.scala#L1925]))]
 This is somewhat concerning because this flush should have been scheduled with 
no delay, but the exception occurred about 6 second after the most recent roll, 
and it wasn't even the most recent file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky commented on pull request #12428: fix flaky test test_standby_tasks_rebalance

2022-07-21 Thread GitBox


lihaosky commented on PR #12428:
URL: https://github.com/apache/kafka/pull/12428#issuecomment-1191770729

   @suhas-satish , good point. That JIRA ticket is internal, let me paste the 
comments there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] suhas-satish commented on pull request #12428: fix flaky test test_standby_tasks_rebalance

2022-07-21 Thread GitBox


suhas-satish commented on PR #12428:
URL: https://github.com/apache/kafka/pull/12428#issuecomment-1191745715

   @lihaosky , can you reference the jira ticket with discussion as rationale 
for this change either in this PR or in the code as comments ? Thanks for the 
fix 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14095) Improve handling of sync offset failures in MirrorMaker

2022-07-21 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14095:
--

 Summary: Improve handling of sync offset failures in MirrorMaker
 Key: KAFKA-14095
 URL: https://issues.apache.org/jira/browse/KAFKA-14095
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Mickael Maison
Assignee: Mickael Maison


MirrorMaker can automatically sync offsets when sync.group.offsets.enabled is 
set to true. However that only works if the consumer groups are not used in the 
target cluster otherwise committing offsets fails with UNKNOWN_MEMBER_ID. 

Currently we treat this case as an error in the admin client so it emits an 
ERROR level log line. This is misleading as it's not an unexpected error and we 
should treat it the same way as other group level error codes.

In addition we should check the result of the alterConsumerGroupOffsets call in 
MirrorMaker and provide an helpful message in case of UNKNOWN_MEMBER_ID errors.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky opened a new pull request, #12428: fix flaky test test_standby_tasks_rebalance

2022-07-21 Thread GitBox


lihaosky opened a new pull request, #12428:
URL: https://github.com/apache/kafka/pull/12428

   ### Description
   In this test, when third proc join, sometimes there are other rebalance 
scenarios such as followup joingroup request happens before syncgroup response 
was received by one of the proc and the previously assigned tasks for that proc 
is then lost during new joingroup request. This can result in standby tasks 
assigned as `3, 1, 2`. This PR relax the expected assignment of `2, 2, 2` to a 
range of `[1-3]`.
   
   ### Testing
   Ran existing test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569563#comment-17569563
 ] 

Jun Rao edited comment on KAFKA-13700 at 7/21/22 5:04 PM:
--

Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. One possibility is that there is some issue in the network. Is the 
error in ReplicaFetcherThread persistent or transient? As Divij asked earlier, 
does this error occur in other topic partitions?


was (Author: junrao):
Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. Is the error in ReplicaFetcherThread persistent or transient?

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13700) Kafka reporting CorruptRecordException exception

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569563#comment-17569563
 ] 

Jun Rao commented on KAFKA-13700:
-

Interesting. So the CRC validation passes with DumpLogSegments, but fails in 
ReplicaFetcherThread? This is a bit weird since the validation is the same in 
both places. Is the error in ReplicaFetcherThread persistent or transient?

> Kafka reporting CorruptRecordException exception
> 
>
> Key: KAFKA-13700
> URL: https://issues.apache.org/jira/browse/KAFKA-13700
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
> Environment: ubuntu 16.04
> kafka 2.4
>Reporter: Uday Bhaskar
>Priority: Critical
>
> In our kafka cluster a couple of partitions in __consumer_offsets and 1 
> regular topic getting data corruption issue while replicas trying to read 
> from leader.  Similar messages for other partitions as well . 
>  
> [2022-02-28 21:57:29,941] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=2] Found invalid messages during fetch for partition 
> __consumer_offsets-10 offset 108845487 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 1524235439) in topic partition __consumer_offsets-10
>  
> another topic partitions with same errors
> [2022-02-28 22:17:00,235] ERROR [ReplicaFetcher replicaId=6, leaderId=1, 
> fetcherId=0] Found invalid messages during fetch for partition 
> px-11351-xx-a56c642-0 offset 11746872 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.CorruptRecordException: Record is corrupt 
> (stored crc = 475179617) in topic partition px-11351-xx-a56c642-0.
>  
> I have verified all infrastructure, dish network and system for any errors 
> found and nothing found. I am not sure why it is happening or how to 
> troubleshoot.  
>  
> Bellow is output of the message from DumpLogSegments , 
>  
> $ /opt/ns/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --verify-index-only --deep-iteration --files ./11324034.log | 
> grep 11746872
> baseOffset: 11746872 lastOffset: 11746872 count: 1 baseSequence: 50278 
> lastSequence: 50278 producerId: 17035 producerEpoch: 0 partitionLeaderEpoch: 
> 8 isTransactional: false isControl: false position: 252530345 CreateTime: 
> 1645886348240 size: 647 magic: 2 compresscodec: SNAPPY crc: 475179617 
> isvalid: true
> | offset: 11746872 CreateTime: 1645886348240 keysize: 54 valuesize: 637 
> sequence: 50278 headerKeys: []



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569561#comment-17569561
 ] 

Mickael Maison commented on KAFKA-13868:


[~junrao] Yes the email was sent to the Kafka private list. Here is the link to 
the email: https://lists.apache.org/thread/w7w5vvc996qdtdhbmf7qf826g9v5fnyo

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-21 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569525#comment-17569525
 ] 

Jun Rao commented on KAFKA-13868:
-

[~mimaison] : You mentioned "As per the email sent to the PMC, all updates have 
to be done by July 22.", do you know when was the email sent to Kafka PMC 
related to this? I can't seem to find this. Thanks.

> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14094) KIP-853: KRaft Voters Change

2022-07-21 Thread Jira
José Armando García Sancio created KAFKA-14094:
--

 Summary: KIP-853: KRaft Voters Change
 Key: KAFKA-14094
 URL: https://issues.apache.org/jira/browse/KAFKA-14094
 Project: Kafka
  Issue Type: Improvement
  Components: kraft
Affects Versions: 3.3.0
Reporter: José Armando García Sancio
 Fix For: 3.3.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-21 Thread GitBox


cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r926852811


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -129,7 +128,7 @@ public void shouldThrowIfStatelessTaskNotInStateRestoring() 
{
 
 @Test
 public void shouldThrowIfStatefulTaskNotInStateRestoring() {
-
shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0)));
+
shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)));

Review Comment:
   All changes here are just replacments of collections with sets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12427: KAFKA-10199: Add tasks to state updater when they are created

2022-07-21 Thread GitBox


cadonna opened a new pull request, #12427:
URL: https://github.com/apache/kafka/pull/12427

   This PR introduces an internal config to enable the state updater.
   
   If the state updater is enabled newly created tasks are added to
   the state updater.
   
   The integration of the state updater starts with this PR and is not
   finished.
   
   Additionally, this PR introduces a builder for mocks for tasks.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13917) Avoid calling lookupCoordinator() in tight loop

2022-07-21 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-13917:

Affects Version/s: 3.3.0
   3.2.1

> Avoid calling lookupCoordinator() in tight loop
> ---
>
> Key: KAFKA-13917
> URL: https://issues.apache.org/jira/browse/KAFKA-13917
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 3.1.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
> Fix For: 3.3.0
>
>
> Currently the heartbeat thread's lookupCoordinator() is called in a tight 
> loop if brokers crash and the consumer is left running. Besides that it 
> floods the logs on debug level, it increases CPU usage as well.
> The fix is easy, just need to put a backoff call after coordinator lookup.
> Reproduction:
> # Start a few brokers
> # Create a topic and produce to it
> # Start consuming
> # Stop all brokers
> At this point lookupCoordinator() will be called in a tight loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1191644117

   @clolov @divijvaidya I just checked and these tests are not run in the 
builds. I have also another PR in streams with tests that use junit5 and they 
are also not run in the builds. Does anybody of you know what is going on here?
   I do not want to to go on with the migration from junit4 to junit5 if tests 
are not run until we have migrated all tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


cadonna merged PR #12285:
URL: https://github.com/apache/kafka/pull/12285


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1191623444

   Failures are not related


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14076:

Affects Version/s: 3.3.0

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread GitBox


mjsax commented on PR #12408:
URL: https://github.com/apache/kafka/pull/12408#issuecomment-1191570949

   Thanks for the fix. Merged to `trunk` and cherry-picked to `3.3` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread GitBox


mjsax merged PR #12408:
URL: https://github.com/apache/kafka/pull/12408


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-21 Thread GitBox


cadonna merged PR #12397:
URL: https://github.com/apache/kafka/pull/12397


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-21 Thread GitBox


cadonna commented on PR #12397:
URL: https://github.com/apache/kafka/pull/12397#issuecomment-1191467025

   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12426: MINOR: Fix broken link to Streams tutorial

2022-07-21 Thread GitBox


cadonna merged PR #12426:
URL: https://github.com/apache/kafka/pull/12426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] soarez commented on pull request #12314: KAFKA-13903: Queue size metric in QuorumController

2022-07-21 Thread GitBox


soarez commented on PR #12314:
URL: https://github.com/apache/kafka/pull/12314#issuecomment-1191462380

   @mumrah a couple of the builds succeeded but some failed with a flaky test — 
`testFencedLeaderRecovery`. There is already a JIRA for it 
https://issues.apache.org/jira/browse/KAFKA-14093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


cadonna commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1191451288

   Once I get acceptable builds, I am going to merge this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


clolov commented on PR #12285:
URL: https://github.com/apache/kafka/pull/12285#issuecomment-1191436663

   Hey @cadonna, I rebased on top of trunk. Do let me know if there is 
something else you would like me to address :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] etolbakov commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-21 Thread GitBox


etolbakov commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1191436621

   Hello Artem @artemlivshits,
   
   I was studying the PR/`KAFKA-14020` ticket and decided to share a minor 
observation.
   It seems that `extractTopicPartition` method from `ProducerInterceptors` 
could be turned in the private (or even inlined).
   
https://github.com/apache/kafka/blob/badfbacdd09a9ee8821847f4b28d98625f354ed7/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java#L125
   Happy to make a PR in case you see it reasonable.
   
   
   Regards, Eugene


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on a diff in pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1

2022-07-21 Thread GitBox


clolov commented on code in PR #12285:
URL: https://github.com/apache/kafka/pull/12285#discussion_r926625230


##
streams/src/test/java/org/apache/kafka/streams/integration/ErrorHandlingIntegrationTest.java:
##
@@ -52,36 +52,46 @@
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class ErrorHandlingIntegrationTest {
 
+private final String testId;
+private final String appId;
+private final Properties properties;
+
+// Task 0
+private final String inputTopic;
+private final String outputTopic;
+// Task 1
+private final String errorInputTopic;
+private final String errorOutputTopic;
+
 private static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
-@BeforeClass
+ErrorHandlingIntegrationTest(final TestInfo testInfo) {
+testId = safeUniqueTestName(getClass(), testInfo);
+appId = "appId_" + testId;
+properties = props();
+
+inputTopic = "input" + testId;
+outputTopic = "output" + testId;
+
+errorInputTopic = "error-input" + testId;
+errorOutputTopic = "error-output" + testId;
+}

Review Comment:
   I cannot remember, so maybe there was no reason. When I was rebasing I 
noticed that this test has since been deleted (or moved to 
`org.apache.kafka.connect.integration`) so I believe it is safe to say I do not 
need to address this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-21 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1191429850

   Out of the 100 runs, 94 passed. The 6 failures all appear to be 
environmental as they were encountered while trying to allocate a console 
consumer at the end of the test with this stack trace:
   ```
   InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 
0')
   Traceback (most recent call last):
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
187, in _do_run
   data = self.run_test()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
265, in run_test
   return self.test_context.function(self.test)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
   return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
 File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 637, in test_exactly_once_source
   consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True)
 File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 
97, in __init__
   BackgroundThreadService.__init__(self, context, num_nodes)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
   super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 
107, in __init__
   self.allocate_nodes()
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 
217, in allocate_nodes
   self.nodes = self.cluster.alloc(self.cluster_spec)
 File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
   allocated = self.do_alloc(cluster_spec)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
   good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
 File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
   raise InsufficientResourcesError(err)
   ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076: Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926562692


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), 
Integer.toString(Integer.MAX_VALUE));
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+

[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException

2022-07-21 Thread Doguscan Namal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569354#comment-17569354
 ] 

Doguscan Namal commented on KAFKA-13953:


I tried to reproduce the issue by sending a negative length for a record on the 
produce request. I overrode the DefaultRecord.java with the following, to 
corrupt the fifth record in the middle of a batch.

 

However, Broker detected that record was not valid

`org.apache.kafka.common.InvalidRecordException: This record has failed the 
validation on broker and hence will be rejected.`

 

Code that I changed to reproduce:
```
public static int writeTo(DataOutputStream out,
                              int offsetDelta,
                              long timestampDelta,
                              ByteBuffer key,
                              ByteBuffer value,
                              Header[] headers) throws IOException {
        int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, 
value, headers);
        int sizeOfSize;
        if ( offsetDelta == 5 ) {
            ByteUtils.writeVarint(-155493822, out);
            sizeOfSize = ByteUtils.sizeOfVarint(-155493822);
        } else {
            ByteUtils.writeVarint(sizeInBytes, out);
            sizeOfSize = ByteUtils.sizeOfVarint(sizeInBytes);
        }
...
return sizeOfSize + sizeInBytes;
}
```

> kafka Console consumer fails with CorruptRecordException 
> -
>
> Key: KAFKA-13953
> URL: https://issues.apache.org/jira/browse/KAFKA-13953
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, controller, core
>Affects Versions: 2.7.0
>Reporter: Aldan Brito
>Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison opened a new pull request, #12426: MINOR: Fix broken link to Streams tutorial

2022-07-21 Thread GitBox


mimaison opened a new pull request, #12426:
URL: https://github.com/apache/kafka/pull/12426

   Also fix `Transforming Data Pt. 2` video title
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14093) Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery

2022-07-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569318#comment-17569318
 ] 

Mickael Maison commented on KAFKA-14093:


I've only been able to reproduce it once locally but I found a couple of other 
instances from our CI: 
- https://github.com/apache/kafka/runs/7208515607
- https://github.com/apache/kafka/runs/7428998555

> Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery
> ---
>
> Key: KAFKA-14093
> URL: https://issues.apache.org/jira/browse/KAFKA-14093
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery.test.stdout
>
>
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testFencedLeaderRecovery FAILED
> java.lang.AssertionError: expected 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException to be 
> thrown, but nothing was thrown



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12397: KAFKA-10199: Cleanup TaskManager and Task interfaces

2022-07-21 Thread GitBox


cadonna commented on code in PR #12397:
URL: https://github.com/apache/kafka/pull/12397#discussion_r926372626


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -262,51 +294,30 @@ Collection tasks(final Collection taskIds) {
 
 // TODO: change return type to `StreamTask`
 Collection activeTasks() {
-return readOnlyActiveTasks;
-}
-
-Collection allTasks() {
-return readOnlyTasks;
-}
-
-Collection notPausedActiveTasks() {
-return new ArrayList<>(readOnlyActiveTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+return Collections.unmodifiableCollection(activeTasksPerId.values());
 }
 
-Collection notPausedTasks() {
-return new ArrayList<>(readOnlyTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+/**
+ * All tasks returned by any of the getters are read-only and should NOT 
be modified;
+ * and the returned task could be modified by other threads concurrently
+ */
+Set allTasks() {
+return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
 }
 
-Set activeTaskIds() {
-return readOnlyActiveTaskIds;
+Set allTaskIds() {
+return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
 }
 
-Set standbyTaskIds() {
-return readOnlyStandbyTaskIds;
-}
-
-// TODO: change return type to `StreamTask`
-Map activeTaskMap() {
-return readOnlyActiveTasksPerId;
-}
-
-// TODO: change return type to `StandbyTask`
-Map standbyTaskMap() {
-return readOnlyStandbyTasksPerId;
-}
-
-Map tasksPerId() {
-return readOnlyTasksPerId;
+Map allTasksPerId() {
+final Map ret = new HashMap<>();
+ret.putAll(activeTasksPerId);
+ret.putAll(standbyTasksPerId);
+return ret;

Review Comment:
   I would even make it unmodifiable to make it clear that changing the 
returned map will not change the maps within the `Tasks` object.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -262,51 +294,30 @@ Collection tasks(final Collection taskIds) {
 
 // TODO: change return type to `StreamTask`
 Collection activeTasks() {
-return readOnlyActiveTasks;
-}
-
-Collection allTasks() {
-return readOnlyTasks;
-}
-
-Collection notPausedActiveTasks() {
-return new ArrayList<>(readOnlyActiveTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+return Collections.unmodifiableCollection(activeTasksPerId.values());
 }
 
-Collection notPausedTasks() {
-return new ArrayList<>(readOnlyTasks)
-.stream()
-.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
-.collect(Collectors.toList());
+/**
+ * All tasks returned by any of the getters are read-only and should NOT 
be modified;
+ * and the returned task could be modified by other threads concurrently
+ */
+Set allTasks() {
+return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
 }
 
-Set activeTaskIds() {
-return readOnlyActiveTaskIds;
+Set allTaskIds() {
+return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
 }
 
-Set standbyTaskIds() {
-return readOnlyStandbyTaskIds;
-}
-
-// TODO: change return type to `StreamTask`
-Map activeTaskMap() {
-return readOnlyActiveTasksPerId;
-}
-
-// TODO: change return type to `StandbyTask`
-Map standbyTaskMap() {
-return readOnlyStandbyTasksPerId;
-}
-
-Map tasksPerId() {
-return readOnlyTasksPerId;
+Map allTasksPerId() {
+final Map ret = new HashMap<>();

Review Comment:
   nit: `ret` is not really a meaningful variable name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14093) Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery

2022-07-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569313#comment-17569313
 ] 

Mickael Maison commented on KAFKA-14093:


cc [~ChrisEgerton]

> Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery
> ---
>
> Key: KAFKA-14093
> URL: https://issues.apache.org/jira/browse/KAFKA-14093
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Attachments: 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery.test.stdout
>
>
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
> testFencedLeaderRecovery FAILED
> java.lang.AssertionError: expected 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException to be 
> thrown, but nothing was thrown



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14093) Flaky ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery

2022-07-21 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-14093:
--

 Summary: Flaky 
ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery
 Key: KAFKA-14093
 URL: https://issues.apache.org/jira/browse/KAFKA-14093
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.3.0
Reporter: Mickael Maison
 Attachments: 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testFencedLeaderRecovery.test.stdout

org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest > 
testFencedLeaderRecovery FAILED
java.lang.AssertionError: expected 
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException to be thrown, 
but nothing was thrown





--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-21 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569309#comment-17569309
 ] 

Mickael Maison commented on KAFKA-14089:


I've hit this issue locally and I can reproduce it fairly consistently. See 
attached logs

[^org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout]
 

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14089) Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic

2022-07-21 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison updated KAFKA-14089:
---
Attachment: 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout

> Flaky ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic
> ---
>
> Key: KAFKA-14089
> URL: https://issues.apache.org/jira/browse/KAFKA-14089
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Chris Egerton
>Priority: Major
> Attachments: failure.txt, 
> org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testSeparateOffsetsTopic.test.stdout
>
>
> It looks like the sequence got broken around "65535, 65537, 65536, 65539, 
> 65538, 65541, 65540, 65543"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14090) Allow Kafka Streams to be configured to not create internal topics

2022-07-21 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569291#comment-17569291
 ] 

Bruno Cadonna commented on KAFKA-14090:
---

I think this would be covered by 
[KIP-698|https://cwiki.apache.org/confluence/x/7CnZCQ]. With KIP-698 you can 
specify to disable the setup of internal topics if some topics are missing. 
Streams would then simply throw an exception and shut down. The KIP is accepted 
but unfortunately only partially implemented. If you are interested you are 
welcome to complete the implementation.

> Allow Kafka Streams to be configured to not create internal topics
> --
>
> Key: KAFKA-14090
> URL: https://issues.apache.org/jira/browse/KAFKA-14090
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Abraham Leal
>Priority: Minor
>
> These should be a way to instruct Kafka Streams to not create internal topics 
> on start-up through configuration and fail if the internal topics needed 
> aren't there.
> The reasoning for this option is in the case of governance for the 
> application: An organization may wish to disallow the creation of topics by 
> clients and opt for all topic creation to be done through administrators or a 
> certain process. Injecting this property in all clients would ensure good 
> central governance of the backing Kafka cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14092) Unsafe memory access operation exception leading to Error processing append operation on partition

2022-07-21 Thread swathi mocharla (Jira)
swathi mocharla created KAFKA-14092:
---

 Summary: Unsafe memory access operation exception leading to Error 
processing append operation on partition
 Key: KAFKA-14092
 URL: https://issues.apache.org/jira/browse/KAFKA-14092
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
 Environment: K8S
Reporter: swathi mocharla


Hi,

We are frequently seeing "Unsafe memory access operation" exception leading to 
"Error processing append operation on partition" on a certain partition.

Here are some logs:
{code:java}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"info","time":"2022-07-20T11:15:10.62400Z","log":{"msg":"[ProducerStateManager
 partition=cdr-group-000-ccf-000-mgmt-0] Wrote producer snapshot at offset 3114 
with 0 producer ids in 47 ms.","class":"kafka.log.ProducerStateManager"}}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"error","time":"2022-07-20T11:15:10.62800Z","log":{"msg":"[ReplicaManager
 broker=1] Error processing append operation on partition 
cdr-group-000-ccf-000-mgmt-0java.lang.InternalError: a fault occurred in a 
recent unsafe memory access operation in compiled Java 
code\njava.io.UnixFileSystem.getBooleanAttributes0(Native 
Method)\njava.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)\njava.io.File.exists(File.java:830)\nkafka.log.TransactionIndex.(TransactionIndex.scala:50)\nkafka.log.LogSegment$.open(LogSegment.scala:663)\nkafka.log.Log.$anonfun$roll$2(Log.scala:1692)\nkafka.log.Log.roll(Log.scala:2487)\nkafka.log.Log.maybeRoll(Log.scala:1632)\nkafka.log.Log.append(Log.scala:900)\nkafka.log.Log.appendAsLeader(Log.scala:741)\nkafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1042)\nkafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1030)\nkafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:931)\nscala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)\nscala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)\nscala.collection.mutable.HashMap.map(HashMap.scala:35)\nkafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:919)\nkafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:591)\nkafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:658)\nkafka.server.KafkaApis.handle(KafkaApis.scala:169)\nkafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)\njava.lang.Thread.run(Thread.java:750)","class":"kafka.server.ReplicaManager"}}{code}
{code:java}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"warning","time":"2022-07-20T11:15:11.63200Z","log":{"msg":"[Log
 partition=cdr-group-000-ccf-000-mgmt-0, dir=/data] Newly rolled segment file 
/data/cdr-group-000-ccf-000-mgmt-0/3114.log already exists; 
deleting it first","class":"kafka.log.Log"}}
{"process":"kafka.Kafka","service":"eventqueue","host":"hubudgen10-ncs-1-workerbm-2","container":"7efcf75afb20","system":"vtas250","neid":"123456","timezone":"UTC","type":"log","level":"error","time":"2022-07-20T11:15:11.63300Z","log":{"msg":"[ReplicaManager
 broker=1] Error processing append operation on partition 
cdr-group-000-ccf-000-mgmt-0java.lang.IllegalStateException: Attempt to append 
a timestamp (1658315708834) to slot 1 no larger than the last timestamp 
appended (8694993961132949504) to 

[jira] [Updated] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13217:

Labels: kip newbie newbie++  (was: needs-kip newbie newbie++)

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13217:

Description: 
In Kafka Streams, when an instance is shut down via the close() API, we 
intentionally skip sending a LeaveGroup request. This is because often the 
shutdown is not due to a scaling down event but instead some transient closure, 
such as during a rolling bounce. In cases where the instance is expected to 
start up again shortly after, we originally wanted to avoid that member's tasks 
from being redistributed across the remaining group members since this would 
disturb the stable assignment and could cause unnecessary state migration and 
restoration. We also hoped
to limit the disruption to just a single rebalance, rather than forcing the 
group to rebalance once when the member shuts down and then again when it comes 
back up. So it's really an optimization  for the case in which the shutdown is 
temporary.
 
That said, many of those optimizations are no longer necessary or at least much 
less useful given recent features and improvements. For example rebalances are 
now lightweight so skipping the 2nd rebalance is not as worth optimizing for, 
and the new assignor will take into account the actual underlying state for 
each task/partition assignment, rather than just the previous assignment, so 
the assignment should be considerably more stable across bounces and rolling 
restarts. 
 
Given that, it might be time to reconsider this optimization. Alternatively, we 
could introduce another form of the close() API that forces the member to leave 
the group, to be used in event of actual scale down rather than a transient 
bounce.

KIP-812: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group]

  was:
In Kafka Streams, when an instance is shut down via the close() API, we 
intentionally skip sending a LeaveGroup request. This is because often the 
shutdown is not due to a scaling down event but instead some transient closure, 
such as during a rolling bounce. In cases where the instance is expected to 
start up again shortly after, we originally wanted to avoid that member's tasks 
from being redistributed across the remaining group members since this would 
disturb the stable assignment and could cause unnecessary state migration and 
restoration. We also hoped
to limit the disruption to just a single rebalance, rather than forcing the 
group to rebalance once when the member shuts down and then again when it comes 
back up. So it's really an optimization  for the case in which the shutdown is 
temporary.
 
That said, many of those optimizations are no longer necessary or at least much 
less useful given recent features and improvements. For example rebalances are 
now lightweight so skipping the 2nd rebalance is not as worth optimizing for, 
and the new assignor will take into account the actual underlying state for 
each task/partition assignment, rather than just the previous assignment, so 
the assignment should be considerably more stable across bounces and rolling 
restarts. 
 
Given that, it might be time to reconsider this optimization. Alternatively, we 
could introduce another form of the close() API that forces the member to leave 
the group, to be used in event of actual scale down rather than a transient 
bounce.


> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer 

[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569283#comment-17569283
 ] 

Matthias J. Sax commented on KAFKA-13217:
-

Just reviewed the PR and linked the tickets. PR LGTM, so we can merge before we 
cut the first RC and ship the KIP.

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sayantanu Dey
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
> Fix For: 3.3.0
>
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569282#comment-17569282
 ] 

Matthias J. Sax commented on KAFKA-14076:
-

Marking this as blocker for 3.3, because this fixes KIP-812 which has a broken 
implementation right now. We need to either get this merger or revert the KIP.

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14076:

Priority: Blocker  (was: Major)

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14076:

Fix Version/s: 3.3.0

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
> Fix For: 3.3.0
>
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14076) Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-14076:
---

Assignee: Jim Hughes

> Fix issues with KafkaStreams.CloseOptions
> -
>
> Key: KAFKA-14076
> URL: https://issues.apache.org/jira/browse/KAFKA-14076
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>
> The new `close(CloseOptions)` function has a few bugs.  
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L1518-L1561)]
> Notably, it needs to remove CGs per StreamThread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-21 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r926285132


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+brokerProps.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp(), 
Integer.toString(Integer.MAX_VALUE));
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+