[GitHub] [kafka] jasonyanwenl opened a new pull request #11241: KAFKA-13032: add NPE checker for KeyValueMapper

2021-08-19 Thread GitBox


jasonyanwenl opened a new pull request #11241:
URL: https://github.com/apache/kafka/pull/11241


   Currently in both `KStreamMap` and `KStreamFlatMap` classes, they will throw 
NPE if the call to `KeyValueMapper#apply` return Null. We should check whether 
the result of that call is Null and throw a more meaningful error message for 
better debugging.
   
   Two unit tests are also added to check if we successfully captured the Null.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] yangdaixai opened a new pull request #11240: KAFKA-13175; test

2021-08-19 Thread GitBox


yangdaixai opened a new pull request #11240:
URL: https://github.com/apache/kafka/pull/11240


   test
   
   Signed-off-by: yangdaixai 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#issuecomment-902407705


   Thanks, @ableegoldman !
   
   I agree it might be overkill, and I wouldn't fault the conservative approach.
   
   The reason I went for a blanket approach is that it seems like 30 seconds is 
an awfully long time to wait for a session timeout in these scenarios, when we 
expect the full test run to complete within a couple of minutes. It seems like 
network glitches are pretty common when we run these, so I'm afraid we'll just 
be playing whack-a-mole by fixing individual tests as they time out due to this 
setting change.
   
   Thanks for the reference to your other PR!
   
   I just verified that we are now passing in the decreased session interval to 
that class via the properties file, so it should no longer be necessary to set 
it in the java source. I've reverted that commit in this PR, and am re-running 
the build just to be on the safe side. Once it passes, I'll go ahead and merge 
it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13212) fetch/findSessions queries with open endpoints for SessionStore

2021-08-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13212:
--
Summary: fetch/findSessions queries with open endpoints for SessionStore  
(was: findSessions queries with open endpoints for SessionStore)

> fetch/findSessions queries with open endpoints for SessionStore
> ---
>
> Key: KAFKA-13212
> URL: https://issues.apache.org/jira/browse/KAFKA-13212
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13212) findSessions queries with open endpoints for SessionStore

2021-08-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-13212:
-

Assignee: Luke Chen

> findSessions queries with open endpoints for SessionStore
> -
>
> Key: KAFKA-13212
> URL: https://issues.apache.org/jira/browse/KAFKA-13212
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13211) fetch queries with open endpoints for WindowStore

2021-08-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-13211:
-

Assignee: Luke Chen

> fetch queries with open endpoints for WindowStore
> -
>
> Key: KAFKA-13211
> URL: https://issues.apache.org/jira/browse/KAFKA-13211
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#issuecomment-902390477


   System test results: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-19--001.system-test-kafka-branch-builder--1629415396--vvcephei--MINOR-decrease-session-timeout-streams-systest--4947e6f130/report.html
   
   None of the streams tests failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13218) kafka deleted unexpired message unexpectedly

2021-08-19 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401943#comment-17401943
 ] 

Haruki Okada commented on KAFKA-13218:
--

One possible cause I can imagine is because logs are rolled based on record's 
timestamp.

[https://kafka.apache.org/documentation/#upgrade_10_1_breaking]

 

If your producer supplies timestamp wrongly, such phenomenon could happen.

> kafka deleted unexpired message unexpectedly
> 
>
> Key: KAFKA-13218
> URL: https://issues.apache.org/jira/browse/KAFKA-13218
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.7.0
> Environment: docker file :
> from openjdk:11-jre-slim-buster
> RUN apt-get update
> RUN apt-get -y install net-tools iputils-ping curl procps
> RUN curl -OL 
> https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz && tar 
> -xzf kafka_2.13-2.7.0.tgz && rm -f kafka_2.13-2.7.0.tgz
> ENV PATH "$PATH:/kafka_2.13-2.7.0/bin"
> RUN mkdir /etc/kafka
> COPY server.properties /etc/kafka/server.properties
> CMD ["kafka-server-start.sh", "/etc/kafka/server.properties"]
> configure file:
> broker.id=2
> log.dirs=/var/lib/kafka
> log.segment.bytes=10485760
> zookeeper.connect=zk-cs.default.svc.cluster.local:2181
> sasl.enabled.mechanisms=PLAIN
> sasl.mechanism.inter.broker.protocol=PLAIN 
> inter.broker.listener.name=INTERNAL
> listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
> listeners=INTERNAL://:9092,EXTERNAL://:30101
> advertised.listeners=INTERNAL://kafka-2.kafka.default.svc.cluster.local:9092,EXTERNAL://192.168.0.13:30101
>Reporter: leiminghany
>Priority: Blocker
>
> I created a topic like this :
>  
> {code:java}
> kafka-topics.sh --create --zookeeper zk-cs.default.svc.cluster.local:2181 
> --partitions 64 --replication-factor 2 --topic signal --config 
> retention.ms=6048000{code}
> and then I send several message into partition 2 of this topic .
>  
> after than, I try to consumer the message from this partiton, but I can't get 
> any messages.
>  I read the kafka data directory, I found the log file was rolled, here is 
> the files:
>  
> {code:java}
> root@kafka-2:/var/lib/kafka/signal-2# ls
> 0005.index  0005.log  
> 0005.snapshot  0005.timeindex  
> leader-epoch-checkpoint
> {code}
> and the dump info is :
>  
>  
> {code:java}
> root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh 
> kafka.tools.DumpLogSegments --deep-iteration --files 0005.log
> Dumping 0005.log
> Starting offset: 5
> root@kafka-2:/var/lib/kafka/signal-2# 
> root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh 
> kafka.tools.DumpLogSegments --deep-iteration --files 
> 0005.index 
> Dumping 0005.index
> root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh 
> kafka.tools.DumpLogSegments --deep-iteration --files 
> 0005.snapshot 
> Dumping 0005.snapshot
> root@kafka-2:/var/lib/kafka/signal-2# kafka-run-class.sh 
> kafka.tools.DumpLogSegments --deep-iteration --files 
> 0005.timeindex 
> Dumping 0005.timeindex
> timestamp: 0 offset: 5
> The following indexed offsets are not found in the log.
> Indexed offset: 5, found log offset: -1
> root@kafka-2:/var/lib/kafka/signal-2# cat leader-epoch-checkpoint 
> 0
> 1
> 0 5
> {code}
>  
> here is the kafka console log about this partition:
>  
> {code:java}
> [2021-08-18 12:04:57,652] INFO [ProducerStateManager partition=signal-2] 
> Writing producer snapshot at offset 5 (kafka.log.ProducerStateManager)
> [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] 
> Rolled new log segment at offset 5 in 7 ms. (kafka.log.Log)
> [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] 
> Deleting segment LogSegment(baseOffset=0, size=318, 
> lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) due to 
> retention time 6048000ms breach based on the largest record timestamp in 
> the segment (kafka.log.Log)
> [2021-08-18 12:04:57,653] INFO [Log partition=signal-2, dir=/var/lib/kafka] 
> Incremented log start offset to 5 due to segment deletion (kafka.log.Log)
> [2021-08-18 12:05:57,671] INFO [Log partition=signal-2, dir=/var/lib/kafka] 
> Deleting segment files LogSegment(baseOffset=0, size=318, 
> lastModifiedTime=1629288220552, largestRecordTimestamp=Some(0)) 
> (kafka.log.Log)
> [2021-08-18 12:05:57,672] INFO Deleted log 
> /var/lib/kafka/signal-2/.log.deleted. 
> (kafka.log.LogSegment)
> [2021-08-18 12:05:57,672] INFO Deleted offset index 
> /var/lib/kafka/signal-2/.index.deleted. 
> (kafka.log.LogSegment)
> [2021-08-18 12:05:57,673] INFO Deleted time 

[jira] [Comment Edited] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401940#comment-17401940
 ] 

Matthias J. Sax edited comment on KAFKA-13216 at 8/20/21, 1:04 AM:
---

We disabled the feature in 3.0.0, and will fix forward in 3.1.0: 
[https://github.com/apache/kafka/pull/11233] 


was (Author: mjsax):
We disabled the feature in 3.0.0, and will fix forward in 3.1.0

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Priority: Critical
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-08-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401940#comment-17401940
 ] 

Matthias J. Sax commented on KAFKA-13216:
-

We disabled the feature in 3.0.0, and will fix forward in 3.1.0

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Priority: Critical
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13216:

Fix Version/s: (was: 3.0.0)
   3.1.0

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Priority: Blocker
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13216:

Priority: Critical  (was: Blocker)

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Priority: Critical
> Fix For: 3.1.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13216) Streams left/outer joins cause new internal changelog topic to grow unbounded

2021-08-19 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13216:

Affects Version/s: (was: 3.0.0)

> Streams left/outer joins cause new internal changelog topic to grow unbounded
> -
>
> Key: KAFKA-13216
> URL: https://issues.apache.org/jira/browse/KAFKA-13216
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sergio Peña
>Priority: Blocker
> Fix For: 3.0.0
>
>
> This bug is caused by the improvements made in 
> https://issues.apache.org/jira/browse/KAFKA-10847, which fixes an issue with 
> stream-stream left/outer joins. The issue is only caused when a stream-stream 
> left/outer join is used with the new `JoinWindows.ofTimeDifferenceAndGrace()` 
> API that specifies the window time + grace period. This new API was added in 
> AK 3.0. No previous users are affected.
> The issue causes that the internal changelog topic used by the new 
> OUTERSHARED window store keeps growing unbounded as new records come. The 
> topic is never cleaned up nor compacted even if tombstones are written to 
> delete the joined and/or expired records from the window store. The problem 
> is caused by a parameter required in the window store to retain duplicates. 
> This config causes that tombstones records have a new sequence ID as part of 
> the key ID in the changelog making those keys unique. Thus causing the 
> cleanup policy not working.
> In 3.0, we deprecated {{JoinWindows.of(size)}} in favor of 
> {{JoinWindows.ofTimeDifferenceAndGrace()}} -- the old API uses the old 
> semantics and is thus not affected while the new API enable the new 
> semantics; the problem is that we deprecated the old API and thus tell users 
> that they should switch to the new broken API.
> We have two ways forward:
>  * Fix the bug (non trivial)
>  * Un-deprecate the old {{JoinWindow.of(size)}} API (and tell users not to 
> use the new but broken API)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


rondagostino commented on a change in pull request #11239:
URL: https://github.com/apache/kafka/pull/11239#discussion_r692561636



##
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##
@@ -85,7 +85,8 @@ trait KafkaBroker extends KafkaMetricsGroup {
 explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, 
name, metricTags)
   }
 
-  newGauge("BrokerState", () => brokerState.value)
+  // visible for testing
+  private[server] val brokerStateGauge = newGauge("BrokerState", () => 
brokerState.value)

Review comment:
   Agreed, we don't need it; 
`ServerStartupTest.testBrokerStateRunningAfterZK()` goes directly against the 
brokerState method.  Will fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


rondagostino commented on a change in pull request #11239:
URL: https://github.com/apache/kafka/pull/11239#discussion_r692561312



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -84,6 +84,8 @@ class BrokerServer(
   val supportedFeatures: util.Map[String, VersionRange]
 ) extends KafkaBroker {
 
+  override def brokerState: BrokerState = currentState()

Review comment:
   Yeah, the same thought occurred to me, but I wanted to get a PR out 
there ASAP.  Will fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11215: KAFKA-12994: Migrate TimeWindowsTest to new API

2021-08-19 Thread GitBox


ableegoldman commented on a change in pull request #11215:
URL: https://github.com/apache/kafka/pull/11215#discussion_r692546162



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
##
@@ -95,27 +91,19 @@ public void advanceIntervalMustNotBeLargerThanWindowSize() {
 
 @Test
 public void gracePeriodShouldEnforceBoundaries() {
-TimeWindows.of(ofMillis(3L)).grace(ofMillis(0L));
+TimeWindows.ofSizeAndGrace(ofMillis(3L), ofMillis(0L));
 
 try {
-TimeWindows.of(ofMillis(3L)).grace(ofMillis(-1L));
+TimeWindows.ofSizeAndGrace(ofMillis(3L), ofMillis(-1L));
 fail("should not accept negatives");
 } catch (final IllegalArgumentException e) {
 //expected
 }
 }
 
-@Test
-public void oldAPIShouldSetDefaultGracePeriod() {
-assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
-assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, 
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
-assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
-assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
-}
-

Review comment:
   Same here: we should leave this test here until we remove the deprecated 
API. (and just suppress the warnings for only this test method)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
##
@@ -19,50 +19,46 @@
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.junit.Test;
 
-import java.time.Duration;
 import java.util.Map;
 
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
-@SuppressWarnings("deprecation")
 public class TimeWindowsTest {
 
 private static final long ANY_SIZE = 123L;
 private static final long ANY_GRACE = 1024L;
 
 @Test
 public void shouldSetWindowSize() {
-assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);

Review comment:
   Let's leave this one in here, as long as we still support a deprecated 
API we should continue to test that it works. We just shouldn't use deprecated 
APIs to test other functionality unrelated to the API itself (eg that an 
exception is thrown for window size of 0).
   
   Then we can restrict the scope of the deprecation warning to just this one 
test, rather than the entire class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11215: KAFKA-12994: Migrate TimeWindowsTest to new API

2021-08-19 Thread GitBox


ableegoldman commented on pull request #11215:
URL: https://github.com/apache/kafka/pull/11215#issuecomment-902302847


   Yep don't worry about it, that test is known to be flaky and unrelated. And 
yeah, the ability to set reviewers is restricted to committers. You can just 
ping people in a comment on your PR for reviews


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


ijuma commented on a change in pull request #11239:
URL: https://github.com/apache/kafka/pull/11239#discussion_r692520138



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -84,6 +84,8 @@ class BrokerServer(
   val supportedFeatures: util.Map[String, VersionRange]
 ) extends KafkaBroker {
 
+  override def brokerState: BrokerState = currentState()

Review comment:
   This is a bit broken. If kraft doesn't use the `_brokerState` field, we 
should not have it in the superclass.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


ijuma commented on a change in pull request #11239:
URL: https://github.com/apache/kafka/pull/11239#discussion_r692520138



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -84,6 +84,8 @@ class BrokerServer(
   val supportedFeatures: util.Map[String, VersionRange]
 ) extends KafkaBroker {
 
+  override def brokerState: BrokerState = currentState()

Review comment:
   This is a bit broken. If kraft doesn't use the `_brokerState` field, we 
should not have it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


ijuma commented on a change in pull request #11239:
URL: https://github.com/apache/kafka/pull/11239#discussion_r692518398



##
File path: core/src/main/scala/kafka/server/KafkaBroker.scala
##
@@ -85,7 +85,8 @@ trait KafkaBroker extends KafkaMetricsGroup {
 explicitMetricName(Server.MetricsPrefix, KafkaBroker.MetricsTypeName, 
name, metricTags)
   }
 
-  newGauge("BrokerState", () => brokerState.value)
+  // visible for testing
+  private[server] val brokerStateGauge = newGauge("BrokerState", () => 
brokerState.value)

Review comment:
   Why do we need this versus the `brokerState` method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] izzyacademy commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog

2021-08-19 Thread GitBox


izzyacademy commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692514791



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RawKeyAccessor.java
##
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals.metrics;
+
+import java.util.Collection;
+import org.apache.kafka.common.utils.Bytes;
+
+public interface RawKeyAccessor {

Review comment:
   @mjsax could we add a brief doc to explain what the interface is used 
for?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #11239: KAFKA-13219: BrokerState metric not working for KRaft clusters

2021-08-19 Thread GitBox


rondagostino opened a new pull request #11239:
URL: https://github.com/apache/kafka/pull/11239


   The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft 
clusters. This patch fixes it and adds a test.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11237: MINOR: Fix how the last borker id is computed

2021-08-19 Thread GitBox


hachikuji merged pull request #11237:
URL: https://github.com/apache/kafka/pull/11237


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory for Windowed Operations in Streams

2021-08-19 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-8613:
---
Description: 
Currently, the grace period is set to retention time if the grace period is not 
specified explicitly. The reason for setting the default grace period to 
retention time was backward compatibility. Topologies that were implemented 
before the introduction of the grace period, added late arriving records to a 
window as long as the window existed, i.e., as long as its retention time was 
not elapsed.

This unintuitive default grace period has already caused confusion among users.

For the next major release, we should consider to set the default grace period 
to {{Duration.ZERO}}. or to make it a mandatory parameter.

 

KIP-633 

[https://cwiki.apache.org/confluence/x/Ho2NCg]

 

  was:
Currently, the grace period is set to retention time if the grace period is not 
specified explicitly. The reason for setting the default grace period to 
retention time was backward compatibility. Topologies that were implemented 
before the introduction of the grace period, added late arriving records to a 
window as long as the window existed, i.e., as long as its retention time was 
not elapsed.

This unintuitive default grace period has already caused confusion among users.

For the next major release, we should consider to set the default grace period 
to {{Duration.ZERO}}. or to make it a mandatory parameter.

 

KIP-633 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams]


> Make Grace Period Mandatory for Windowed Operations in Streams
> --
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should consider to set the default grace 
> period to {{Duration.ZERO}}. or to make it a mandatory parameter.
>  
> KIP-633 
> [https://cwiki.apache.org/confluence/x/Ho2NCg]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13219) BrokerState metric not working for KRaft clusters

2021-08-19 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-13219:
-

 Summary: BrokerState metric not working for KRaft clusters
 Key: KAFKA-13219
 URL: https://issues.apache.org/jira/browse/KAFKA-13219
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Affects Versions: 3.0.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino


The BrokerState metric always has a value of 0, for NOT_RUNNING, in KRaft 
clusters



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory for Windowed Operations in Streams

2021-08-19 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-8613:
---
Summary: Make Grace Period Mandatory for Windowed Operations in Streams  
(was: Make Grace Period Mandatory in Windowed Operations)

> Make Grace Period Mandatory for Windowed Operations in Streams
> --
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should consider to set the default grace 
> period to {{Duration.ZERO}}. or to make it a mandatory parameter.
>  
> KIP-633 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8613) Make Grace Period Mandatory in Windowed Operations

2021-08-19 Thread Israel Ekpo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Israel Ekpo updated KAFKA-8613:
---
Summary: Make Grace Period Mandatory in Windowed Operations  (was: Make 
grace period mandatory)

> Make Grace Period Mandatory in Windowed Operations
> --
>
> Key: KAFKA-8613
> URL: https://issues.apache.org/jira/browse/KAFKA-8613
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Israel Ekpo
>Priority: Blocker
>  Labels: kip
> Fix For: 3.0.0
>
>
> Currently, the grace period is set to retention time if the grace period is 
> not specified explicitly. The reason for setting the default grace period to 
> retention time was backward compatibility. Topologies that were implemented 
> before the introduction of the grace period, added late arriving records to a 
> window as long as the window existed, i.e., as long as its retention time was 
> not elapsed.
> This unintuitive default grace period has already caused confusion among 
> users.
> For the next major release, we should consider to set the default grace 
> period to {{Duration.ZERO}}. or to make it a mandatory parameter.
>  
> KIP-633 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Drop+24+hour+default+of+grace+period+in+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino opened a new pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests

2021-08-19 Thread GitBox


rondagostino opened a new pull request #11238:
URL: https://github.com/apache/kafka/pull/11238


   I noticed that a system test using a KRaft cluster with 3 brokers but only 1 
co-located controller did not force-kill the second and third broker after 
shutting down the first broker (the one with the controller).  The issue was a 
floating point rounding error.  This patch adjusts for the rounding error and 
also makes the logic work for an even number of controllers.  A local run of 
`tests/kafkatest/sanity_checks/test_bounce.py` succeeded (and I manually 
increased the cluster size for the 1 co-located controller case and observed 
the correct kill behavior: the second and third brokers were force-killed as 
expected).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11237: MINOR: Fix how the last borker id is computed

2021-08-19 Thread GitBox


jsancio opened a new pull request #11237:
URL: https://github.com/apache/kafka/pull/11237


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2021-08-19 Thread Yiming Zang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401818#comment-17401818
 ] 

Yiming Zang edited comment on KAFKA-9320 at 8/19/21, 6:45 PM:
--

We have seen some regression after enabling and upgraded to TLS1.3 with Kafka 
version of 2.7.0, we have been seeing very frequent EOFException and 
disconnection:
{code:java}
[2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, 
fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; 
closing connection (org.apache.kafka.common.network.Selector)
 java.io.EOFException: EOF during read
 at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627)
 at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466)
 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416)
 at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:520)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562)
 at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96)
 at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
 at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143)
 at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}

 We have to rollback to use TLS1.2 and that solves the EOFException issue


was (Author: yzang):
We have seen some regression after enabling and upgraded to TLS1.3 with Kafka 
version of 2.7.0, we have been seeing very frequent EOFException and 
disconnection:
[2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, 
fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; 
closing connection (org.apache.kafka.common.network.Selector)
java.io.EOFException: EOF during read
at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416)
at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620)
at org.apache.kafka.common.network.Selector.poll(Selector.java:520)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562)
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
We have to rollback to use TLS1.2 and that solves the EOFException issue

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.6.0
>
> Attachments: report.txt
>
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9320) Enable TLSv1.3 by default and disable some of the older protocols

2021-08-19 Thread Yiming Zang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401818#comment-17401818
 ] 

Yiming Zang commented on KAFKA-9320:


We have seen some regression after enabling and upgraded to TLS1.3 with Kafka 
version of 2.7.0, we have been seeing very frequent EOFException and 
disconnection:
[2021-08-13 06:07:26,069] WARN [ReplicaFetcher replicaId=18, leaderId=20, 
fetcherId=0] Unexpected error from atla-alo-26-sr1.prod.twttr.net/10.41.44.125; 
closing connection (org.apache.kafka.common.network.Selector)
java.io.EOFException: EOF during read
at 
org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:627)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:118)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:466)
at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:416)
at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:729)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:620)
at org.apache.kafka.common.network.Selector.poll(Selector.java:520)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:562)
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:96)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:110)
at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:211)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:310)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:143)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:142)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:122)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
We have to rollback to use TLS1.2 and that solves the EOFException issue

> Enable TLSv1.3 by default and disable some of the older protocols
> -
>
> Key: KAFKA-9320
> URL: https://issues.apache.org/jira/browse/KAFKA-9320
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.6.0
>
> Attachments: report.txt
>
>
> KAFKA-7251 added support for TLSv1.3. We should include this in the list of 
> protocols that are enabled by default. We should also disable some of the 
> older protocols that are not secure. This change requires a KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12984) Cooperative sticky assignor can get stuck with invalid SubscriptionState input metadata

2021-08-19 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12984:
---
Fix Version/s: 2.8.1

> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---
>
> Key: KAFKA-12984
> URL: https://issues.apache.org/jira/browse/KAFKA-12984
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692343425



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {

Review comment:
   I think you are right. makeFollowers needs it to assign the topic ID to 
the log for the first time, but for this PR, we've assigned it above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


jolshan commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692338540



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   there is an issue with alter isr which is partially explained in this 
comment:
   ```
   * With the addition of AlterIsr, we also consider newly added replicas as 
part of the ISR when advancing
   * the HW. These replicas have not yet been committed to the ISR by the 
controller, so we could revert to the previously
   * committed ISR. However, adding additional replicas to the ISR makes it 
more restrictive and therefor safe. We call
   * this set the "maximal" ISR. See KIP-497 for more details
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on a change in pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#discussion_r692335138



##
File path: tests/kafkatest/services/streams.py
##
@@ -686,12 +691,13 @@ def prop_file(self):
   streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
   streams_property.NUM_THREADS: self.NUM_THREADS,
   consumer_property.GROUP_INSTANCE_ID: 
self.GROUP_INSTANCE_ID,
-  consumer_property.SESSION_TIMEOUT_MS: 6}
-
-properties['input.topic'] = self.INPUT_TOPIC
+  consumer_property.SESSION_TIMEOUT_MS: 6,
+  'input.topic': self.INPUT_TOPIC,
+  "acceptable.recovery.lag": "9223372036854775807", # 
enable a one-shot assignment
+  "session.timeout.ms": "1" # set back to 10s for 
tests. See KIP-735
+  }
 
 # Long.MAX_VALUE lets us do the assignment without a warmup

Review comment:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on a change in pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#discussion_r692334950



##
File path: tests/kafkatest/services/streams.py
##
@@ -659,14 +663,15 @@ def __init__(self, test_context, kafka):
 
 def prop_file(self):
 properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
-  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers()}
-
-properties['input.topic'] = self.INPUT_TOPIC
-properties['aggregation.topic'] = self.AGGREGATION_TOPIC
-properties['add.operations'] = self.ADD_ADDITIONAL_OPS
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  'input.topic': self.INPUT_TOPIC,
+  'aggregation.topic': self.AGGREGATION_TOPIC,
+  'add.operations': self.ADD_ADDITIONAL_OPS,
+  "acceptable.recovery.lag": "9223372036854775807", # 
enable a one-shot assignment
+  "session.timeout.ms": "1" # set back to 10s for 
tests. See KIP-735
+  }
 
 # Long.MAX_VALUE lets us do the assignment without a warmup

Review comment:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on a change in pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#discussion_r692334355



##
File path: tests/kafkatest/services/streams.py
##
@@ -569,16 +572,17 @@ def __init__(self, test_context, kafka):
 
 def prop_file(self):
 properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
-  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers()}
-
-properties['topology.optimization'] = self.OPTIMIZED_CONFIG
-properties['input.topic'] = self.INPUT_TOPIC
-properties['aggregation.topic'] = self.AGGREGATION_TOPIC
-properties['reduce.topic'] = self.REDUCE_TOPIC
-properties['join.topic'] = self.JOIN_TOPIC
+  streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+  'topology.optimization': self.OPTIMIZED_CONFIG,
+  'input.topic': self.INPUT_TOPIC,
+  'aggregation.topic': self.AGGREGATION_TOPIC,
+  'reduce.topic': self.REDUCE_TOPIC,
+  'join.topic': self.JOIN_TOPIC,
+  "acceptable.recovery.lag": "9223372036854775807", # 
enable a one-shot assignment
+  "session.timeout.ms": "1" # set back to 10s for 
tests. See KIP-735
+  }
 
 # Long.MAX_VALUE lets us do the assignment without a warmup

Review comment:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei commented on pull request #11236:
URL: https://github.com/apache/kafka/pull/11236#issuecomment-902094127


   system test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4657/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401763#comment-17401763
 ] 

Sagar Rao commented on KAFKA-13152:
---

Thanks [~mjsax]/[~guozhang]. I will start writing a KIP and send out an email 
soon.

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] officialpatterson commented on pull request #11215: kafka-12994: Migrate TimeWindowsTest to new API

2021-08-19 Thread GitBox


officialpatterson commented on pull request #11215:
URL: https://github.com/apache/kafka/pull/11215#issuecomment-902088249


   Doesn't look like I have permissions to assign reviewers to this PR 
@ableegoldman.
   
   Also, fairly new to this, but it looks like these test failures are down to 
a flakey integration test, is that correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #11236: MINOR: Set session timeout back to 10s for Streams system tests

2021-08-19 Thread GitBox


vvcephei opened a new pull request #11236:
URL: https://github.com/apache/kafka/pull/11236


   We increased the default session timeout to 30s in KIP-735:
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-735%3A+Increase+default+consumer+session+timeout
   
   Since then, we are observing sporadic system test failures due to rebalances 
taking
   longer than the test timeout. Rather than increase the test wait times, we 
can 
   just override the session timeout to a value more appropriate in the testing 
domain.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog

2021-08-19 Thread GitBox


mjsax commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692305627



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
 }
 
 /**
- * {@inheritdoc}
+ * {@inheritDoc}
  *
  * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
  * range queries are not supported.
  */
 @Override
 public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-if (binaryKeyFrom != null || binaryKeyTo != null) {
-throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+return Iterator::hasNext;
 }
 
-if (from != 0 && to != Long.MAX_VALUE) {
-throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+return iterator -> {
+while (iterator.hasNext()) {
+final Bytes bytes = iterator.peekNextKey();
+final Bytes keyBytes = Bytes
+
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+final long time = 
TimeOrderedKeySchema.extractStoreTimestamp(bytes.get());
+if (keyBytes.compareTo(binaryKeyFrom) >= 0
+&& keyBytes.compareTo(binaryKeyTo) <= 0
+&& time >= from
+&& time <= to) {
+return true;
+}
+iterator.next();
+}
+return false;
+};
 }
 
-return iterator -> iterator.hasNext();
+throw new IllegalArgumentException("Key and time range queries are not 
supported.");
 }
 
 @Override
 public  List segmentsToSearch(final Segments 
segments, final long from, final long to, final boolean forward) {
-throw new UnsupportedOperationException();
+if (from != to) {
+throw new IllegalArgumentException("");

Review comment:
   Ups. This one slipped...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog

2021-08-19 Thread GitBox


mjsax commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692305287



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##
@@ -43,12 +45,12 @@
 
 @Override
 public Bytes upperRange(final Bytes key, final long to) {
-throw new UnsupportedOperationException();
+return null;

Review comment:
   Works for me. @guozhangwang WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception

2021-08-19 Thread BurningIce (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401736#comment-17401736
 ] 

BurningIce commented on KAFKA-4669:
---


I'm using kafka-clients 2.5.0, also encountering this issue at consumer-side.

I turned on the trace level for kafka log after the error occured (not before)
It seems that the correlation-id in the previous log "Using older server API v7 
to send OFFSET_COMMIT ... with correlation id" does not match with the one 
appeared in the exception message.

Will the logs help?
[~rsivaram] 


 08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] 
Sending asynchronous auto-commit of offsets 
{q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}}
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.ConsumerCoordinator - 
[Consumer clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] 
Sending OffsetCommit request with 
{q-app-jvm-1=OffsetAndMetadata{offset=13500194, leaderEpoch=5, metadata=''}} to 
coordinator apmkafka10:9092 (id: 2147483643 rack: null)
08-19 23:29:13 [pool-21-thread-1] DEBUG o.a.k.c.NetworkClient - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Using older 
server API v7 to send OFFSET_COMMIT 
{group_id=cg-apm-dc-wrap-jvm,generation_id=222,member_id=consumer-cg-apm-dc-wrap-jvm-1-c9eec325-434f-4495-b040-7522d5c2ec87,group_instance_id=null,topics=[{name=q-app-jvm,partitions=[{partition_index=1,committed_offset=13500194,committed_leader_epoch=5,committed_metadata=}]}]}
 with correlation id 13993458 to node 2147483643
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] TRACE o.a.k.c.c.i.Fetcher - [Consumer 
clientId=consumer-cg-apm-dc-wrap-jvm-1, groupId=cg-apm-dc-wrap-jvm] Skipping 
fetch for partition q-app-jvm-1 because previous request to apmkafka12:9092 
(id: 6 rack: null) has not been processed
08-19 23:29:13 [pool-21-thread-1] ERROR c.n.m.k.KafkaTopicConsumer - unexpected 
error to get message from kafka: Correlation id for response (13993458) does 
not match request (13993456), request header: 
RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=7, 

[GitHub] [kafka] spena commented on a change in pull request #11235: KAFKA-13216: write correct tombstones into stream-stream join store changelog

2021-08-19 Thread GitBox


spena commented on a change in pull request #11235:
URL: https://github.com/apache/kafka/pull/11235#discussion_r692180473



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
##
@@ -214,6 +223,128 @@ public void testLeftJoinDuplicates() {
 }
 }
 
+@Test
+public void shouldSendTombstoneForLeftJoinCandidatesRocksDb() {

Review comment:
   Could you add a similar test to the KStreamKStreamOuterJoinTest?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
 }
 
 /**
- * {@inheritdoc}
+ * {@inheritDoc}
  *
  * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
  * range queries are not supported.
  */
 @Override
 public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-if (binaryKeyFrom != null || binaryKeyTo != null) {
-throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+return Iterator::hasNext;
 }
 
-if (from != 0 && to != Long.MAX_VALUE) {
-throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+return iterator -> {
+while (iterator.hasNext()) {
+final Bytes bytes = iterator.peekNextKey();
+final Bytes keyBytes = Bytes
+
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+final long time = 
TimeOrderedKeySchema.extractStoreTimestamp(bytes.get());
+if (keyBytes.compareTo(binaryKeyFrom) >= 0
+&& keyBytes.compareTo(binaryKeyTo) <= 0
+&& time >= from
+&& time <= to) {
+return true;
+}
+iterator.next();
+}
+return false;
+};
 }
 
-return iterator -> iterator.hasNext();
+throw new IllegalArgumentException("Key and time range queries are not 
supported.");
 }
 
 @Override
 public  List segmentsToSearch(final Segments 
segments, final long from, final long to, final boolean forward) {

Review comment:
   Add some tests in TimeOrderedKeySchemaTest.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
 }
 
 /**
- * {@inheritdoc}
+ * {@inheritDoc}
  *
  * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
  * range queries are not supported.
  */
 @Override
 public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {

Review comment:
   TimeOrderedKeySchemaTest

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##
@@ -72,27 +74,47 @@ public long segmentTimestamp(final Bytes key) {
 }
 
 /**
- * {@inheritdoc}
+ * {@inheritDoc}
  *
  * This method is optimized for {@link 
RocksDBTimeOrderedWindowStore#all()} only. Key and time
  * range queries are not supported.
  */
 @Override
 public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-if (binaryKeyFrom != null || binaryKeyTo != null) {
-throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys 
cannot be non-null. Key and time range queries are not supported.");
+if (binaryKeyFrom == null && binaryKeyTo == null && from == 0 && to == 
Long.MAX_VALUE) {
+return Iterator::hasNext;
 }
 
-if (from != 0 && to != Long.MAX_VALUE) {
-throw new IllegalArgumentException("from/to time ranges should be 
0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+if (binaryKeyFrom != null && binaryKeyFrom.equals(binaryKeyTo) && from 
== to) {
+
+return iterator -> {
+while (iterator.hasNext()) {
+final Bytes bytes = iterator.peekNextKey();
+final Bytes keyBytes = Bytes
+
.wrap(TimeOrderedKeySchema.extractStoreKeyBytes(bytes.get()));
+final long time = 

[GitHub] [kafka] dajac commented on pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`

2021-08-19 Thread GitBox


dajac commented on pull request #11229:
URL: https://github.com/apache/kafka/pull/11229#issuecomment-902020042


   > Thanks for the PR. It makes sense to me. Would it be possible to add a 
test for the case when a late `DelayedJoin` completes?
   
   Yeah, let me see if I can add something.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11229: KAFKA-12961; Verify group generation in `DelayedJoin`

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11229:
URL: https://github.com/apache/kafka/pull/11229#discussion_r692245978



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -1391,78 +1392,92 @@ class GroupCoordinator(val brokerId: Int,
 }
   }
 
-  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean): 
Boolean = {
+  def tryCompleteJoin(
+group: GroupMetadata,
+generationId: Int,
+forceComplete: () => Boolean
+  ): Boolean = {
 group.inLock {
-  if (group.hasAllMembersJoined)
+  if (generationId != group.generationId) {
+forceComplete()
+  } else if (group.hasAllMembersJoined) {
 forceComplete()
-  else false
+  } else false
 }
   }
 
-  def onCompleteJoin(group: GroupMetadata): Unit = {
+  def onCompleteJoin(
+group: GroupMetadata,
+generationId: Int
+  ): Unit = {
 group.inLock {
-  val notYetRejoinedDynamicMembers = 
group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)
-  if (notYetRejoinedDynamicMembers.nonEmpty) {
-info(s"Group ${group.groupId} removed dynamic members " +
-  s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
-
-notYetRejoinedDynamicMembers.values.foreach { failedMember =>
-  group.remove(failedMember.memberId)
-  removeHeartbeatForLeavingMember(group, failedMember.memberId)
-}
-  }
-
-  if (group.is(Dead)) {
-info(s"Group ${group.groupId} is dead, skipping rebalance stage")
-  } else if (!group.maybeElectNewJoinedLeader() && 
group.allMembers.nonEmpty) {
-// If all members are not rejoining, we will postpone the completion
-// of rebalance preparing stage, and send out another delayed operation
-// until session timeout removes all the non-responsive members.
-error(s"Group ${group.groupId} could not complete rebalance because no 
members rejoined")
-rebalancePurgatory.tryCompleteElseWatch(
-  new DelayedJoin(this, group, group.rebalanceTimeoutMs),
-  Seq(GroupJoinKey(group.groupId)))
+  if (generationId != group.generationId) {
+error(s"Received unexpected notification of join complete for 
${group.groupId} " +
+  s"with an old generation $generationId while the group has 
${group.generationId}.")
   } else {
-group.initNextGeneration()
-if (group.is(Empty)) {
-  info(s"Group ${group.groupId} with generation ${group.generationId} 
is now empty " +
-
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
+val notYetRejoinedDynamicMembers = 
group.notYetRejoinedMembers.filterNot(_._2.isStaticMember)

Review comment:
   That's correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-19 Thread GitBox


akatona84 commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-902006686


   > Thanks for the PR, it looks good. Is it possible to add a test for it?
   
   @mimaison , Thanks for the review, unfortunately that code part where the 
restclient  is used, not unit tested at all. It would need more effort to fully 
understand how the distributed herder could be tested better. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andricDu commented on pull request #8103: KAFKA-7061: KIP-280 Enhanced log compaction

2021-08-19 Thread GitBox


andricDu commented on pull request #8103:
URL: https://github.com/apache/kafka/pull/8103#issuecomment-902003678


   Bump.
   
   Status update on this feature?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions

2021-08-19 Thread GitBox


dajac commented on pull request #11230:
URL: https://github.com/apache/kafka/pull/11230#issuecomment-901980004


   Let me check those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions

2021-08-19 Thread GitBox


lbradstreet commented on pull request #11230:
URL: https://github.com/apache/kafka/pull/11230#issuecomment-901975481


   @dajac the updated changes look good to me overall. It seems like we're 
hitting some gradle executor errors across multiple runs, so it may not be a 
coincidence.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-19 Thread GitBox


vvcephei commented on pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#issuecomment-901975557


   Thanks @hachikuji !
   
   FYI: I ran three full batches of system tests on your PR branch, and did not 
see a failure in that test or any other consistent test failure:
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346306--hachikuji--KAFKA-13214--e5782597f3/report.html
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346035--hachikuji--KAFKA-13214--e5782597f3/report.html
   
   
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-18--001.system-test-kafka-branch-builder--1629346129--hachikuji--KAFKA-13214--e5782597f3/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -82,7 +82,7 @@ class AbstractFetcherThreadTest {
 
 // add one partition to create the consumer lag metric
 fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
-fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+fetcher.addPartitions(Map(partition -> 
initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0)))

Review comment:
   nit: If we keep `topicIds` as a Scala map, we would get an Option 
directly with `topicIds.get(...)`. Also, a space is missing after the `,` and 
the `()` could be omitted ;).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692175152



##
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##
@@ -82,7 +82,7 @@ class AbstractFetcherThreadTest {
 
 // add one partition to create the consumer lag metric
 fetcher.setReplicaState(partition, 
MockFetcherThread.PartitionState(leaderEpoch = 0))
-fetcher.addPartitions(Map(partition -> initialFetchState(0L, leaderEpoch = 
0)))
+fetcher.addPartitions(Map(partition -> 
initialFetchState(Some(topicIds.get(partition.topic())),0L, leaderEpoch = 0)))

Review comment:
   nit: If we keep `topicIds` as a Scala map, we would get an Option 
directly with `topicIds.get(...)`. Also, a space is missing after the `,`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692169643



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()
+try {
+  partitionStates.forKeyValue { (partition, partitionState) =>
+val newLeaderBrokerId = partitionState.leader
+  if (metadataCache.hasAliveBroker(newLeaderBrokerId)) {

Review comment:
   Intuitively, I would have thoughts that we don't need this. The broker 
is already a follower in this case so the leader should be alive. What's your 
take on this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692168365



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?
+partitionStates.forKeyValue { (partition, partitionState) =>
+  if (traceLoggingEnabled)
+stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId 
$correlationId from controller $controllerId " +
+  s"epoch $controllerEpoch starting the become-follower transition for 
partition ${partition.topicPartition} with leader " +
+  s"${partitionState.leader}")
+  responseMap.put(partition.topicPartition, Errors.NONE)
+}
+
+val partitionsToUpdateFollower: mutable.Set[Partition] = mutable.Set()

Review comment:
   nit: We could omit specifying the type here and use 
`mutable.Set.empty[Partition]`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692166774



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {
+val traceLoggingEnabled = stateChangeLogger.isTraceEnabled
+// Do we need this?

Review comment:
   I don't think so. Those logs, which are exactly the same that we have in 
`makeFollowers`, would be really confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692165645



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,
+controllerEpoch: Int,
+partitionStates: Map[Partition, 
LeaderAndIsrPartitionState],
+correlationId: Int,
+responseMap: 
mutable.Map[TopicPartition, Errors],
+topicIds: String => Option[Uuid]) : 
Set[Partition] = {

Review comment:
   Is `topicIds` necessary? I suppose that we could get the `topicId` from 
the `Partition`, isn't it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692164654



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1544,7 +1554,7 @@ class ReplicaManager(val config: KafkaConfig,
   // replica from source dir to destination dir
   logManager.abortAndPauseCleaning(topicPartition)
 
-  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(leader,
+  futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic()), leader,

Review comment:
   nit: parenthesis not required after `topic` ;)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692164057



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1420,6 +1423,10 @@ class ReplicaManager(val config: KafkaConfig,
   }
   val partitionsToBeFollower = partitionStates.filter { case (k, _) => 
!partitionsToBeLeader.contains(k) }
 
+  val partitionsToUpdateIdForFollower = topicIdUpdatePartitions.filter 
{ case (_, partitionState) =>
+partitionState.leader != localBrokerId
+  }

Review comment:
   Could we avoid having to re-iterate over `topicIdUpdatePartitions` by 
applying the condition before adding the partition to `topicIdUpdatePartitions`?
   
   I have opened a PR to do the same for the two collections above: 
https://github.com/apache/kafka/pull/11225




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11170:
URL: https://github.com/apache/kafka/pull/11170#discussion_r692162163



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1755,6 +1765,78 @@ class ReplicaManager(val config: KafkaConfig,
 partitionsToMakeFollower
   }
 
+  private def updateTopicIdForFollowers(controllerId: Int,

Review comment:
   Right. Reusing `makeFollowers` would not work out of the box.
   
   I am not sure to follow your point regarding the possibility of overriding a 
pending isr state. I thought that only the leader updates the isr state and the 
followers only updates it via `updateAssignmentAndIsr` based on the controller 
state. I might be missing something.
   
   I am not against having `updateTopicIdForFollowers` but I wanted to ensure 
that we have thought about all the options.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11231: KAFKA-13214; Consumer should not reset state after retriable error in rebalance

2021-08-19 Thread GitBox


dajac commented on a change in pull request #11231:
URL: https://github.com/apache/kafka/pull/11231#discussion_r692154377



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -476,7 +476,6 @@ boolean joinGroupIfNeeded(final Timer timer) {
 else if (!future.isRetriable())
 throw exception;
 
-resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));

Review comment:
   Your reasoning makes sense to me. From a first read, the PR looks pretty 
good. I will make a second pass on Monday to ensure that I cover all the cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11230: KAFKA-12840; Removing `compact` cleaning on a topic should abort on-going compactions

2021-08-19 Thread GitBox


dajac commented on pull request #11230:
URL: https://github.com/apache/kafka/pull/11230#issuecomment-901717039


   @lbradstreet I have updated the PR based on your feedback.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #11173: MINOR: Support max timestamp in GetOffsetShell

2021-08-19 Thread GitBox


dengziming commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r691845925



##
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##
@@ -224,9 +227,14 @@ object GetOffsetShell {
   /**
* Return the partition infos. Filter them with topicPartitionFilter.
*/
-  private def listPartitionInfos(consumer: KafkaConsumer[_, _], 
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
-consumer.listTopics.asScala.values.flatMap { partitions =>
-  partitions.asScala.filter(topicPartitionFilter)
+  private def listPartitionInfos(client: Admin, topicPartitionFilter: 
PartitionInfo => Boolean, listInternal: Boolean): Seq[PartitionInfo] = {
+val topics = client.listTopics(new 
ListTopicsOptions().listInternal(listInternal)).names().get().asScala

Review comment:
   I changed the method from creating a topic-partition filter to create a 
topic filter and a topic-partition filter, the topic filter is used to filter 
topics before fetching and the topic-partition filter is used to filter 
topic-partitions after receiving topic metadata. PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-19 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17401493#comment-17401493
 ] 

Christo Lolov commented on KAFKA-12994:
---

Great, thanks for the reply! [~officialandyp], I will continue with the 
SessionWindowsTest and will leave SlidingWindowsTest to you. Let's continue 
with our current approach (separate PRs) and see how it goes :)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Andrew patterson
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)