[GitHub] [kafka] johnthotekat edited a comment on pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat edited a comment on pull request #9133: URL: https://github.com/apache/kafka/pull/9133#issuecomment-670823191 Closing this PR. Raised another PR https://github.com/apache/kafka/pull/9146 . This branch got a little messed up during the rebase :( This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat commented on pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat commented on pull request #9133: URL: https://github.com/apache/kafka/pull/9133#issuecomment-670823191 This branch got a little messed up during the rebase :( Raised another PR https://github.com/apache/kafka/pull/9146 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat closed pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat closed pull request #9133: URL: https://github.com/apache/kafka/pull/9133 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat opened a new pull request #9146: KAFKA-10316 Updated Kafka Streams upgrade-guide.html
johnthotekat opened a new pull request #9146: URL: https://github.com/apache/kafka/pull/9146 * This PR updates the upgrade guide for the changes in https://github.com/apache/kafka/pull/9120. * Added the details on KIP-648 to 2.7.0 upgrade notes - docs/streams/upgrade-guide.html. @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
mjsax commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r467291346 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -93,9 +90,7 @@ public boolean isActive() { public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); - -// initialize the snapshot with the current offsets as we don't need to commit then until they change -offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); +initializeCheckpoint(); Review comment: In the old code, we actually get a copy of the `Map`, while within `initializeCheckpoint();` don't -- is this on purpose? It it safe? Also, do we actually need the method? The old code was just doing the exact some thing? It's just one-liner method -- what do we gain? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -49,6 +59,30 @@ this.stateDirectory = stateDirectory; } +protected void initializeCheckpoint() { +// we will delete the local checkpoint file after registering the state stores and loading them into the +// state manager, therefore we should initialize the snapshot as empty to indicate over-write checkpoint needed Review comment: Seems the comment is outdated? `we should initialize the snapshot as empty` ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1638,21 +1689,22 @@ public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() { task.suspend(); task.prepareCommit(); -task.postCommit(); +task.postCommit(false); assertEquals(Task.State.SUSPENDED, task.state()); EasyMock.verify(stateManager); } @Test -public void shouldSwallowExceptionOnCloseCleanError() { +public void shouldThrowExceptionOnCloseCleanError() { final long offset = 543L; EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, offset))); -EasyMock.expectLastCall(); +stateManager.checkpoint(); +EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes(); Review comment: as above? (more below... won't add comments each time) ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -412,7 +412,7 @@ public void shouldInitializeOffsetsFromCheckpointFile() throws IOException { stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); stateMgr.initializeStoreOffsetsFromCheckpoint(true); -assertFalse(checkpointFile.exists()); +assertTrue(checkpointFile.exists()); Review comment: Should we add a test for EOS, that the checkpoint file is deleted? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1259,13 +1262,47 @@ public void shouldReInitializeTopologyWhenResuming() throws IOException { } @Test -public void shouldCheckpointOffsetsOnCommit() { +public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() { final Long offset = 543L; EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); +stateManager.checkpoint(); +EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)); +EasyMock.expect(stateManager.changelogOffsets()) +.andReturn(Collections.singletonMap(changelogPartition, 0L)) +.andReturn(Collections.singletonMap(changelogPartition, 10L)) +.andReturn(Collections.singletonMap(changelogPartition, 20L)); +stateManager.registerStore(stateStore, stateStore.stateRestoreCallback); EasyMock.expectLastCall(); +EasyMock.replay(stateManager, recordCollector); + +task = createStatefulTask(createConfig(false, "100"), true); + +task.initializeIfNeeded(); +task.completeRestoration(); + +task.prepareCommit(); +task.postCommit(true); + +task.prepareCommit(); +task.postCommit(false); + +EasyMock.verify(recordCollector); Review comment: Should we verify `stateManager`, too? ## File path:
[GitHub] [kafka] ning2008wisc commented on pull request #9145: KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init
ning2008wisc commented on pull request #9145: URL: https://github.com/apache/kafka/pull/9145#issuecomment-670820575 As mentioned in https://stackoverflow.com/questions/54480715/no-current-assignment-for-partition-occurs-even-after-poll-in-kafka, another potential resolution could be "call consumer.seek(tp, offsets)" in **onPartitionsAssigned() callback** after subscribing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc opened a new pull request #9145: KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init
ning2008wisc opened a new pull request #9145: URL: https://github.com/apache/kafka/pull/9145 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to consume from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] provided a way to supply the offsets from external (e.g. implementation of SinkTask) to rewind customer. In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the consumer. when SinkTask first initializes (+start(Map props)+), we do +"context.offset(offsets);+" , then in above step (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*, in this case. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648) [2020-08-07 23:53:55,752] INFO [Consumer clientId=connector-consumer-MirrorSinkConnector-0, groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 (org.apache.kafka.clients.consumer.KafkaConsumer:1592) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187) java.lang.IllegalStateException: No current assignment for partition test-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:188) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext >
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig, private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup) memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - // data-plane - private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() - private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => -new RequestChannel(20, ControlPlaneMetricPrefix, time)) +new RequestChannel(20, ControlPlaneMetricPrefix, time, true)) + // data-plane + private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() + private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() + // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given + // request is from the control plane or not. Review comment: I don't think this is quite right. The flag should be set based on whether we're in the control plane listener OR the inter-broker listener. It's much more common to use a separate inter-broker listener, than to use a fully separate control plane listener. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig, private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup) memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - // data-plane - private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() - private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => -new RequestChannel(20, ControlPlaneMetricPrefix, time)) +new RequestChannel(20, ControlPlaneMetricPrefix, time, true)) + // data-plane + private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() + private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() + // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given + // request is from the control plane or not. Review comment: I don't think this is quite right. The flag should be set based on whether we're in the control plane listener OR the inter-broker listener. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467330968 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -309,7 +309,10 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, + val metricNamePrefix : String, + time: Time, + val maybeFromControlPlane: Boolean) extends KafkaMetricsGroup { Review comment: Instead of `maybeFromControlPlane`, how about `fromControlPlaneListener` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467330705 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java ## @@ -69,4 +69,11 @@ * Returns the correlation id from the request header. */ int correlationId(); + +/** + * Returns the initial principal name for a forwarded request. + */ +default String initialPrincipalName() { Review comment: Does this need to be here? I'm concerned that having it here will eventually lead to us using it for authorization, which it shouldn't be. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
cmccabe commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467330529 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ## @@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) { } public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) { -this(new RequestHeaderData(). -setRequestApiKey(requestApiKey.id). -setRequestApiVersion(requestVersion). -setClientId(clientId). -setCorrelationId(correlationId), +this(requestApiKey, requestVersion, clientId, correlationId, null, null); +} + +public RequestHeader(ApiKeys requestApiKey, Review comment: I think it would be good to have a constructor that didn't have the two forwarding fields, to avoid changing a large number of tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
abbccdda opened a new pull request #9144: URL: https://github.com/apache/kafka/pull/9144 Add the redirection supporting fields, including: 1. initial principal name 2. initial client id 3. the flag to indicate whether a given request is coming from the control plane in a secured environment. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.
[ https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10371: Affects Version/s: (was: 2.7.0) > Partition reassignments can result in crashed ReplicaFetcherThreads. > > > Key: KAFKA-10371 > URL: https://issues.apache.org/jira/browse/KAFKA-10371 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Steve Rodrigues >Assignee: David Jacot >Priority: Critical > > A Kafka system doing partition reassignments got stuck with the reassignment > partially done and the system with a non-zero number of URPs and increasing > max lag. > Looking in the logs, we see: > {noformat} > [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Error due to > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for foo > [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Stopped > {noformat} > Investigating further and with some helpful changes to the exception (which > was not generating a stack trace because it was a client-side exception), we > see on a test run: > {noformat} > [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for topic-test-topic-85 > at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415) > at > kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645) > at > kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672) > at > kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133) > at > kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) > at scala.collection.AbstractIterable.foreach(Iterable.scala:920) > at > kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309) > {noformat} > It appears that the fetcher is attempting to fetch for a partition that has > been getting reassigned away. From further investigation, it seems that in > KAFKA-10002 the StopReplica code was changed from: > 1. Remove partition from fetcher > 2. Remove partition from partition map > to the other way around, but now the fetcher may race and attempt to build a > fetch for a partition that's no longer mapped. In particular, since the > logOrException code is being called from logStartOffset which isn't protected > against NotLeaderOrFollowerException, just against KafkaStorageException, the > exception isn't caught and throws all the way out, killing the replica > fetcher thread. > We need to switch this back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.
[ https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10371. - Resolution: Fixed > Partition reassignments can result in crashed ReplicaFetcherThreads. > > > Key: KAFKA-10371 > URL: https://issues.apache.org/jira/browse/KAFKA-10371 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Steve Rodrigues >Assignee: David Jacot >Priority: Critical > > A Kafka system doing partition reassignments got stuck with the reassignment > partially done and the system with a non-zero number of URPs and increasing > max lag. > Looking in the logs, we see: > {noformat} > [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Error due to > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for foo > [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Stopped > {noformat} > Investigating further and with some helpful changes to the exception (which > was not generating a stack trace because it was a client-side exception), we > see on a test run: > {noformat} > [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for topic-test-topic-85 > at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415) > at > kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645) > at > kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672) > at > kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133) > at > kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) > at scala.collection.AbstractIterable.foreach(Iterable.scala:920) > at > kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309) > {noformat} > It appears that the fetcher is attempting to fetch for a partition that has > been getting reassigned away. From further investigation, it seems that in > KAFKA-10002 the StopReplica code was changed from: > 1. Remove partition from fetcher > 2. Remove partition from partition map > to the other way around, but now the fetcher may race and attempt to build a > fetch for a partition that's no longer mapped. In particular, since the > logOrException code is being called from logStartOffset which isn't protected > against NotLeaderOrFollowerException, just against KafkaStorageException, the > exception isn't caught and throws all the way out, killing the replica > fetcher thread. > We need to switch this back. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads
hachikuji merged pull request #9140: URL: https://github.com/apache/kafka/pull/9140 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
mjsax commented on pull request #9141: URL: https://github.com/apache/kafka/pull/9141#issuecomment-670745736 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soondenana commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads
soondenana commented on pull request #9140: URL: https://github.com/apache/kafka/pull/9140#issuecomment-670730240 I can confirm that this fixes the issue. I ran system test 18 times and didn't hit this issue. Without this fix, when I run the system test 10 times I hit the issue 2 times. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r467296111 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); + +final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); +groupedStream + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) +.reduce(reducer) +.toStream() +.to(outputTopic, Produced.with(windowedSerde, Serdes.String())); + +startStreams(); + +final List, String>> windowedOutput = receiveMessages( +new TimeWindowedDeserializer<>(), +new StringDeserializer(), +String.class, +25); + +// read from ConsoleConsumer +final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( +new TimeWindowedDeserializer(), +new StringDeserializer(), +String.class, +25, +true); + +final Comparator, String>> comparator = +Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) +.thenComparing(KeyValueTimestamp::value); + +windowedOutput.sort(comparator); +final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; +final long firstBatchRightWindow = firstBatchTimestamp + 1; +final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; +final long secondBatchRightWindow = secondBatchTimestamp + 1; +final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference; + +final List, String>> expectResult = Arrays.asList( +new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), Review comment: Clearly Kafka Streams is superior to the plain Consumer This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting
mjsax commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r466716421 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -49,6 +61,30 @@ this.stateDirectory = stateDirectory; } +protected void initializeCheckpoint() { +// we will delete the local checkpoint file after registering the state stores and loading them into the Review comment: Independent of this PR, might it be worth to change this and close this window of risk? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests
skaundinya15 commented on a change in pull request #9142: URL: https://github.com/apache/kafka/pull/9142#discussion_r467274037 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True): self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) -def delete_topic(self, topic, node=None): +def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False): Review comment: Yeah that's a good point, but in this scenario I was thinking to remain consistent with the original implementation. Also I'm not sure how far we want to backport this, but if we backport it past the version where `--bootstrap-server` is not available, then I think we would have to rewrite this function to account for that. Also since we are on our way to deprecate and remove ZK, this would be a good way to start pushing us towards doing that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation
dielhennr commented on a change in pull request #9101: URL: https://github.com/apache/kafka/pull/9101#discussion_r467273920 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) { log.info("Successfully joined group with generation {}", generation.generationId); state = MemberState.STABLE; rejoinNeeded = false; +rebalanceConfig.coordinatorUpdated(); Review comment: The latest commit tries to address this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests
rondagostino commented on a change in pull request #9142: URL: https://github.com/apache/kafka/pull/9142#discussion_r467263885 ## File path: tests/kafkatest/services/kafka/kafka.py ## @@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, use_zk_to_create_topic=True): self.logger.info("Running topic creation command...\n%s" % cmd) node.account.ssh(cmd) -def delete_topic(self, topic, node=None): +def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False): Review comment: create/describe/list use `use_zk_to_..._topic=True` so I'm wondering if it is best to remain consistent here. For example, if running an older version, might `--bootstrap-server` not be available? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r467260318 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.time.Instant; +import java.util.HashSet; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor; +import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor; +import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull; + +public class KStreamSlidingWindowAggregate implements KStreamAggProcessorSupplier, V, Agg> { +private final Logger log = LoggerFactory.getLogger(getClass()); + +private final String storeName; +private final SlidingWindows windows; +private final Initializer initializer; +private final Aggregator aggregator; + +private boolean sendOldValues = false; + +public KStreamSlidingWindowAggregate(final SlidingWindows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { +this.windows = windows; +this.storeName = storeName; +this.initializer = initializer; +this.aggregator = aggregator; +} + +@Override +public Processor get() { +return new KStreamSlidingWindowAggregateProcessor(); +} + +public SlidingWindows windows() { +return windows; +} + +@Override +public void enableSendingOldValues() { +sendOldValues = true; +} + +private class KStreamSlidingWindowAggregateProcessor extends AbstractProcessor { +private TimestampedWindowStore windowStore; +private TimestampedTupleForwarder, Agg> tupleForwarder; +private StreamsMetricsImpl metrics; +private InternalProcessorContext internalProcessorContext; +private Sensor lateRecordDropSensor; +private Sensor droppedRecordsSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean reverseIteratorImplemented = false; + +@SuppressWarnings("unchecked") +@Override +public void init(final ProcessorContext context) { +super.init(context); +internalProcessorContext = (InternalProcessorContext) context; +metrics = internalProcessorContext.metrics(); +final String threadId = Thread.currentThread().getName(); +lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor( +threadId, +context.taskId().toString(), +internalProcessorContext.currentNode().name(), +metrics +); +//catch unsupported operation error +droppedRecordsSensor = droppedRecordsSensorOrSkippedRecordsSensor(threadId, context.taskId().toString(), metrics); +
[GitHub] [kafka] cmccabe commented on pull request #9103: Add redirection for (Incremental)AlterConfig
cmccabe commented on pull request #9103: URL: https://github.com/apache/kafka/pull/9103#issuecomment-670693346 I think it would be good to split this PR up a little bit. It seems like we could have a split like this: PR 1. `Add flag to the RequestContext` and `Add initial principal name` PR 2. Authorization changes for AlterConfigs / IncrementalAlterConfigs, forwarding if required, IBP check, bump RPC versions of AlterConfigs / IncrementalAlterConfigs PR 3. AdminClient changes in behavior based on versions of AlterConfigs / IncrementalAlterConfigs, AlterConfigsUtil, etc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
mjsax commented on a change in pull request #9133: URL: https://github.com/apache/kafka/pull/9133#discussion_r467244495 ## File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java ## @@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHo } /** - * Get the Active streams instance for given key Review comment: Not sure. I am not an git guru... Can you maybe just rebase this PR against latest trunk (just to make sure we don't mess anything up?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
mjsax commented on pull request #9028: URL: https://github.com/apache/kafka/pull/9028#issuecomment-670689958 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat commented on a change in pull request #9133: URL: https://github.com/apache/kafka/pull/9133#discussion_r467236707 ## File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java ## @@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHo } /** - * Get the Active streams instance for given key Review comment: If you see from the commit history in the timeline . My last commit is : https://github.com/apache/kafka/pull/9133/commits/14ab0529a492e9b1d361b1ab3081b8fa7189632f (this has only the html changes) and all the ones before that are already merged to trunk. Am I missing something here for the previous changes to show up in this PR too? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat commented on a change in pull request #9133: URL: https://github.com/apache/kafka/pull/9133#discussion_r467236707 ## File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java ## @@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHo } /** - * Get the Active streams instance for given key Review comment: If you see from the commit history in the timeline . My last commit is : https://github.com/apache/kafka/pull/9133/commits/14ab0529a492e9b1d361b1ab3081b8fa7189632f (this has only the html changes) and all the ones before that are already merged to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
lct45 commented on a change in pull request #9141: URL: https://github.com/apache/kafka/pull/9141#discussion_r467236081 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -47,18 +47,96 @@ CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) { this.builder = builder; } - - KTable build(final Map, Aggregator> groupPatterns, + KTable buildNotWindowed(final Map, Aggregator> groupPatterns, final Initializer initializer, final NamedInternal named, final StoreBuilder storeBuilder, final Serde keySerde, final Serde valueSerde, - final String queryableName, - final Windows windows, - final SessionWindows sessionWindows, - final Merger sessionMerger) { + final String queryableName) { +build(groupPatterns, storeBuilder); +final Collection processors = new ArrayList<>(); +boolean stateCreated = false; +int counter = 0; +for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { +final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( +initializer, +named.suffixWithOrElseGet( +"-cogroup-agg-" + counter++, +builder, +CogroupedKStreamImpl.AGGREGATE_NAME), +stateCreated, +storeBuilder, +new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue())); +stateCreated = true; +processors.add(statefulProcessorNode); +builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); +} +return createTable(processors, named, keySerde, valueSerde, queryableName); +} + + KTable buildTimeWindows(final Map, Aggregator> groupPatterns, + final Initializer initializer, + final NamedInternal named, + final StoreBuilder storeBuilder, + final Serde keySerde, + final Serde valueSerde, + final String queryableName, + final Windows windows) { +build(groupPatterns, storeBuilder); +final Collection processors = new ArrayList<>(); +boolean stateCreated = false; +int counter = 0; +for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { +final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( +initializer, +named.suffixWithOrElseGet( +"-cogroup-agg-" + counter++, +builder, +CogroupedKStreamImpl.AGGREGATE_NAME), +stateCreated, +storeBuilder, +new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, kGroupedStream.getValue())); +stateCreated = true; +processors.add(statefulProcessorNode); +builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); +} +return createTable(processors, named, keySerde, valueSerde, queryableName); +} + + KTable buildSessionWindows(final Map, Aggregator> groupPatterns, + final Initializer initializer, + final NamedInternal named, + final StoreBuilder storeBuilder, + final Serde keySerde, + final Serde valueSerde, + final String queryableName, + final SessionWindows sessionWindows, +
[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
johnthotekat commented on a change in pull request #9133: URL: https://github.com/apache/kafka/pull/9133#discussion_r467235567 ## File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java ## @@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHo } /** - * Get the Active streams instance for given key Review comment: Yeah , this should only update the html file. I raised the PR from the same branch. Err., I wonder, as these changes were already merged, should it even be showing these as changes here ? I'm saying about the previous changes which were already merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.
mjsax commented on a change in pull request #9133: URL: https://github.com/apache/kafka/pull/9133#discussion_r467233225 ## File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java ## @@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final Set standbyHo } /** - * Get the Active streams instance for given key Review comment: As the original PR is merged already, this seems to be redundant (similar below for all other code changes). -- This PR should only update the html file? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
mjsax commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r467231847 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java ## @@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception { ))); } + + +@Test +public void shouldReduceSlidingWindows() throws Exception { +streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); +final long firstBatchTimestamp = 2000L; +final long timeDifference = 1000L; +produceMessages(firstBatchTimestamp); +final long secondBatchTimestamp = firstBatchTimestamp + timeDifference / 2; +produceMessages(secondBatchTimestamp); +final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference - 100L; +produceMessages(thirdBatchTimestamp); + +final Serde> windowedSerde = WindowedSerdes.timeWindowedSerdeFrom(String.class); +groupedStream + .windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), ofMillis(2000L))) +.reduce(reducer) +.toStream() +.to(outputTopic, Produced.with(windowedSerde, Serdes.String())); + +startStreams(); + +final List, String>> windowedOutput = receiveMessages( +new TimeWindowedDeserializer<>(), +new StringDeserializer(), +String.class, +25); + +// read from ConsoleConsumer +final String resultFromConsoleConsumer = readWindowedKeyedMessagesViaConsoleConsumer( +new TimeWindowedDeserializer(), +new StringDeserializer(), +String.class, +25, +true); + +final Comparator, String>> comparator = +Comparator.comparing((KeyValueTimestamp, String> o) -> o.key().key()) +.thenComparing(KeyValueTimestamp::value); + +windowedOutput.sort(comparator); +final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference; +final long firstBatchRightWindow = firstBatchTimestamp + 1; +final long secondBatchLeftWindow = secondBatchTimestamp - timeDifference; +final long secondBatchRightWindow = secondBatchTimestamp + 1; +final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference; + +final List, String>> expectResult = Arrays.asList( +new KeyValueTimestamp<>(new Windowed<>("A", new TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp), Review comment: Thanks. I missed the point that this trick to pass in the windowSize only works for KafkaStreams when we pass in `Serdes` object that are used as provided... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 opened a new pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer
skaundinya15 opened a new pull request #9143: URL: https://github.com/apache/kafka/pull/9143 Currently the way we calculate the number of total consumed messages for the verifiable consumer overcounts the number of actually consumed messages. This PR is to fix that to ensure we count the number of consumed messages correctly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 opened a new pull request #9142: MINOR: fixing delete topic for system tests
skaundinya15 opened a new pull request #9142: URL: https://github.com/apache/kafka/pull/9142 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* The current implementation of `delete_topic` in `kafka.py` doesn't take into account security credentials and thus fails when running tests with security enabled and trying to delete a topic. This PR is to fix this issue. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
ableegoldman commented on a change in pull request #9141: URL: https://github.com/apache/kafka/pull/9141#discussion_r467210400 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -47,18 +47,96 @@ CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) { this.builder = builder; } - - KTable build(final Map, Aggregator> groupPatterns, + KTable buildNotWindowed(final Map, Aggregator> groupPatterns, Review comment: WDYT about naming this just `build`? It's not as clear, but I think it's in line with the naming conventions elsewhere. For example we have `KStreamWindowAggregate` and `KStreamAggregate` (then maybe we can come up with a more descriptive name for the method currently called `build` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -85,68 +163,39 @@ groupedStreams.remove(kGrouped); kGrouped.ensureCopartitionWith(groupedStreams); -final Collection processors = new ArrayList<>(); -boolean stateCreated = false; -int counter = 0; -for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { -final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( -kGroupedStream.getValue(), -initializer, -named.suffixWithOrElseGet( -"-cogroup-agg-" + counter++, -builder, -CogroupedKStreamImpl.AGGREGATE_NAME), -stateCreated, -storeBuilder, -windows, -sessionWindows, -sessionMerger); -stateCreated = true; -processors.add(statefulProcessorNode); -builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); -} +} + + KTable createTable(final Collection processors, + final NamedInternal named, + final Serde keySerde, + final Serde valueSerde, + final String queryableName) { final String mergeProcessorName = named.suffixWithOrElseGet( -"-cogroup-merge", -builder, -CogroupedKStreamImpl.MERGE_NAME); +"-cogroup-merge", +builder, +CogroupedKStreamImpl.MERGE_NAME); final ProcessorSupplier passThrough = new PassThrough<>(); final ProcessorGraphNode mergeNode = -new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName)); +new ProcessorGraphNode<>(mergeProcessorName, new ProcessorParameters<>(passThrough, mergeProcessorName)); builder.addGraphNode(processors, mergeNode); return new KTableImpl( -mergeProcessorName, -keySerde, -valueSerde, -Collections.singleton(mergeNode.nodeName()), -queryableName, -passThrough, -mergeNode, -builder); +mergeProcessorName, +keySerde, +valueSerde, +Collections.singleton(mergeNode.nodeName()), +queryableName, +passThrough, +mergeNode, +builder); } -private StatefulProcessorNode getStatefulProcessorNode(final Aggregator aggregator, - final Initializer initializer, +private StatefulProcessorNode getStatefulProcessorNode(final Initializer initializer, Review comment: We can remove the `` now, right? Also it looks like the `initializer` input isn't needed anymore either ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -85,68 +163,39 @@ groupedStreams.remove(kGrouped); kGrouped.ensureCopartitionWith(groupedStreams); -final Collection processors = new ArrayList<>(); -boolean stateCreated = false; -int counter = 0; -for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { -final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( -kGroupedStream.getValue(), -initializer, -named.suffixWithOrElseGet( -"-cogroup-agg-" + counter++, -builder, -CogroupedKStreamImpl.AGGREGATE_NAME), -stateCreated, -storeBuilder, -
[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig
abbccdda commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r467215647 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def alterConfigsRequest = new AlterConfigsRequest.Builder( - Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), -new AlterConfigsRequest.Config(Collections.singleton( - new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "100") -))), true).build() +Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), + new AlterConfigsRequest.Config(Collections.singleton( +new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "100") + ))), true).build() Review comment: Let me check around. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig
cmccabe commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r467214507 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def alterConfigsRequest = new AlterConfigsRequest.Builder( - Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), -new AlterConfigsRequest.Config(Collections.singleton( - new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "100") -))), true).build() +Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic), + new AlterConfigsRequest.Config(Collections.singleton( +new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "100") + ))), true).build() Review comment: Can we get rid of whitespace-only changes like this, or at least move them to another PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
mimaison commented on a change in pull request #9136: URL: https://github.com/apache/kafka/pull/9136#discussion_r467206725 ## File path: clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * An implementation of {@link ConfigProvider} based on a directory of files. + * Property keys correspond to the names of the regular (i.e. non-directory) + * files in a directory given by the path parameter. + * Property values are taken from the file contents corresponding to each key. + */ +public class DirectoryConfigProvider implements ConfigProvider { + +private final Logger log = LoggerFactory.getLogger(getClass()); + +@Override +public void configure(Map configs) { } + +@Override +public void close() throws IOException { } + +/** + * Retrieves the data contained in regular files in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @return the configuration data. + */ +@Override +public ConfigData get(String path) { +return get(path, File::isFile); +} + +/** + * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ +@Override +public ConfigData get(String path, Set keys) { +return get(path, pathname -> +pathname.isFile() +&& keys.contains(pathname.getName())); +} + +private ConfigData get(String path, FileFilter fileFilter) { +Map map = new HashMap<>(); +if (path != null && !path.isEmpty()) { +File dir = new File(path); +if (!dir.isDirectory()) { +log.warn("The path {} is not a directory", path); +} else { +for (File file : dir.listFiles(fileFilter)) { Review comment: Should we use `java.nio.Files` here instead? That will also give us an Exception if an IO error happens instead of `null`. ## File path: clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import
[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments
[ https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173403#comment-17173403 ] Dongjoon Hyun commented on KAFKA-10223: --- Hi, [~rsivaram]. Is this only 2.6.0 issue? > ReplicaNotAvailableException must be retriable to handle reassignments > -- > > Key: KAFKA-10223 > URL: https://issues.apache.org/jira/browse/KAFKA-10223 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.6.0 > > > ReplicaNotAvailableException should be a retriable `InvalidMetadataException` > since consumers may throw this during reassignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-670643947 @abbccdda I've addressed your comments, can you take another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r467194925 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1221,28 +1226,39 @@ private DeleteGroupsResponse createDeleteGroupsResponse() { private ListOffsetRequest createListOffsetRequest(int version) { if (version == 0) { -Map offsetData = Collections.singletonMap( -new TopicPartition("test", 0), -new ListOffsetRequest.PartitionData(100L, 10)); +ListOffsetTopic topic = new ListOffsetTopic() Review comment: Not entirely sure. It's only used 3 times so we're not going to save very much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r467193718 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int version) { private ListOffsetResponse createListOffsetResponse(int version) { if (version == 0) { -Map responseData = new HashMap<>(); -responseData.put(new TopicPartition("test", 0), -new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L))); -return new ListOffsetResponse(responseData); +ListOffsetResponseData data = new ListOffsetResponseData() Review comment: I'm not sure about adding extra methods to `ListOffsetResponse` just to remove a few lines in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r467191680 ## File path: core/src/main/scala/kafka/admin/ConfigCommand.scala ## @@ -508,7 +563,15 @@ object ConfigCommand extends Config { val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ") val entriesStr = entries.asScala.map(e => s"${e._1}=${e._2}").mkString(", ") - println(s"Configs for ${entityStr} are ${entriesStr}") + println(s"Quota configs for ${entityStr} are ${entriesStr}") +} +// we describe user SCRAM credentials only when we are not describing client information +// and we are not given either --entity-default or --user-defaults +if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) { + getUserScramCredentialConfigs(adminClient, entityNames).foreach { case (user, description) => Review comment: I'm adding `core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala` -- let me know if this test covers this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations
hachikuji merged pull request #9110: URL: https://github.com/apache/kafka/pull/9110 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
lbradstreet commented on a change in pull request #7222: URL: https://github.com/apache/kafka/pull/7222#discussion_r467187874 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -422,10 +421,13 @@ synchronized int numAssignedPartitions() { } synchronized List fetchablePartitions(Predicate isAvailable) { -return assignment.stream() -.filter(tpState -> isAvailable.test(tpState.topicPartition()) && tpState.value().isFetchable()) -.map(PartitionStates.PartitionState::topicPartition) -.collect(Collectors.toList()); +List result = new ArrayList<>(); Review comment: We should add a small comment that this is in the hotpath and is written the "ugly" way for a reason. It's also probably worth mentioning that we do the cheap isFetchable check first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet closed pull request #9123: MINOR: optimize fetchablePartitions check by performing cheap check first
lbradstreet closed pull request #9123: URL: https://github.com/apache/kafka/pull/9123 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
hachikuji commented on a change in pull request #7222: URL: https://github.com/apache/kafka/pull/7222#discussion_r467143222 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -1120,17 +1123,28 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren } } +/** + * If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then + * we should check that all of the assignments have a valid position. + */ +private void maybeValidateAssignments() { Review comment: The name here threw me off because of its generality. I think we need to work metadata into the name to make it clearer. Maybe `validatePositionsOnMetadataChange` or `validatePositionsForLeaderChanges`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 opened a new pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
lct45 opened a new pull request #9141: URL: https://github.com/apache/kafka/pull/9141 Updated `CogroupedStreamAggregateBuilder` to have individual builders depending on the windowed aggregation, or lack thereof. This replaced passing in all options into the builder, with all but the current type of aggregation set to null and then checking to see which value was not null. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file
mimaison commented on pull request #9131: URL: https://github.com/apache/kafka/pull/9131#issuecomment-670634953 Thanks @mjsax, I wasn't expecting a review this quickly! Sorry for not fully checking the changes before opening the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids
omkreddy commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-670626828 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards
ableegoldman commented on pull request #8976: URL: https://github.com/apache/kafka/pull/8976#issuecomment-670624337 Awesome, thanks @jeqo ! Let me know when the first PR is ready for review again (or is it ready now?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks
omkreddy closed pull request #9050: URL: https://github.com/apache/kafka/pull/9050 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads
dajac commented on pull request #9140: URL: https://github.com/apache/kafka/pull/9140#issuecomment-670607229 Jira: https://issues.apache.org/jira/browse/KAFKA-10374 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10374) Add concurrency tests for the ReplicaManager
David Jacot created KAFKA-10374: --- Summary: Add concurrency tests for the ReplicaManager Key: KAFKA-10374 URL: https://issues.apache.org/jira/browse/KAFKA-10374 Project: Kafka Issue Type: Improvement Reporter: David Jacot We recently discovered a regression in the ReplicaManager that was due to some concurrency issue: KAFKA-10371. We should add concurrency tests for this area of the broker to ensure that we catch similar issues in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads
hachikuji commented on pull request #9140: URL: https://github.com/apache/kafka/pull/9140#issuecomment-670606960 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Description: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution that has been initially verified is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe*. was: In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: {code:java} java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) {code} As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.6.0 > > > In WorkerSinkTask.java, when we want the consumer to start consuming from > certain offsets, rather than from the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] > is used to carry the offsets from external world (e.g. implementation of > SinkTask). > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, > (2) consumer.seek(tp, offset) to rewind the consumer. > when running (2), we saw the following IllegalStateException: > {code:java} > java.lang.IllegalStateException: No current assignment for partition mytopic-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution
[GitHub] [kafka] jeqo closed pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards
jeqo closed pull request #8976: URL: https://github.com/apache/kafka/pull/8976 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards
jeqo commented on pull request #8976: URL: https://github.com/apache/kafka/pull/8976#issuecomment-670601500 Closing in favor of #9137, #9138 and #9139 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards
jeqo commented on pull request #8976: URL: https://github.com/apache/kafka/pull/8976#issuecomment-670601289 @ableegoldman great feedback. It makes sense to group changes in 3 different PRs. As there is dependencies between stores, let's start with KeyValue, then Window and finally Session store. I have created #9137, #9138, #9139 to be tackled in that order. PS. I'm considering all the comments you mentioned here. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads
dajac opened a new pull request #9140: URL: https://github.com/apache/kafka/pull/9140 https://github.com/apache/kafka/pull/8672 introduced a bug leading to crashing the replica fetcher threads. The issue is that https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to stopping the replica fetchers. As the replica fetchers relies access the Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException that is not handled. This PR reverts the code to the original ordering to avoid this issue. The regression has been caught by our system test: `kafkatest.tests.core.reassign_partitions_test`. I have not managed to reproduce the issue in a unit test without reimplementing the entire system test in Java. I am not sure that makes sense as we already have it in Python. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded
hachikuji commented on a change in pull request #7222: URL: https://github.com/apache/kafka/pull/7222#discussion_r467139256 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -406,6 +407,10 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, long offset, OffsetRes return new HashSet<>(this.assignment.partitionSet()); } +public synchronized void forEachAssignedPartition(Consumer consumer) { Review comment: This is unused ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -1142,6 +1156,7 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren Optional leaderOpt = position.currentLeader.leader; if (!leaderOpt.isPresent()) { +log.info("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position); Review comment: This feels more like a debug level log to me. Was there a specific reason we needed it? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -1120,17 +1123,28 @@ Node selectReadReplica(TopicPartition partition, Node leaderReplica, long curren } } +/** + * If we have seen new metadata (as tracked by {@link org.apache.kafka.clients.Metadata#updateVersion()}), then + * we should check that all of the assignments have a valid position. + */ +private void maybeValidateAssignments() { Review comment: The name here through me off because of its generality. I think we need to work metadata into the name to make it clearer. Maybe `validatePositionsOnMetadataChange` or `validatePositionsForLeaderChanges`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
jeqo opened a new pull request #9139: URL: https://github.com/apache/kafka/pull/9139 Depends on https://github.com/apache/kafka/pull/9138 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request #9138: Backward windowstore
jeqo opened a new pull request #9138: URL: https://github.com/apache/kafka/pull/9138 Depends on https://github.com/apache/kafka/pull/9137 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo opened a new pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore
jeqo opened a new pull request #9137: URL: https://github.com/apache/kafka/pull/9137 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10329) Enable connector context in logs by default
[ https://issues.apache.org/jira/browse/KAFKA-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173179#comment-17173179 ] Dongjin Lee commented on KAFKA-10329: - Hi [~rhauch], It seems like this issue is related to [KIP-653: Upgrade log4j to log4j2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2]. Could you have a look? In my humble opinion, it would be good to handle both issues at once since upgrading to log4j2 inevitably introduces some backward compatibility break. > Enable connector context in logs by default > --- > > Key: KAFKA-10329 > URL: https://issues.apache.org/jira/browse/KAFKA-10329 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Randall Hauch >Priority: Blocker > Labels: needs-kip > Fix For: 3.0.0 > > > When > [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs] > was implemented and released as part of AK 2.3, we chose to not enable these > extra logging context information by default because it was not backward > compatible, and anyone relying upon the `connect-log4j.properties` file > provided by the AK distribution would after an upgrade to AK 2.3 (or later) > see different formats for their logs, which could break any log processing > functionality they were relying upon. > However, we should enable this in AK 3.0, whenever that comes. Doing so will > require a fairly minor KIP to change the `connect-log4j.properties` file > slightly. > Marked this as BLOCKER since it's a backward incompatible change that we > definitely want to do in the 3.0.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)
[ https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173151#comment-17173151 ] Dongjin Lee commented on KAFKA-10352: - Hi [~dorbae], It seems like you are using `/tmp/kafka-logs` as a storage directory. right? Then, your OS may delete the directory for garbage collecting the `/tmp` directory - I think it should the cause of the exception. I strongly recommend not to use the `/tmp` as a storage directory and assign a dedicated device for storage. > Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel) > - > > Key: KAFKA-10352 > URL: https://issues.apache.org/jira/browse/KAFKA-10352 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Seongbae Chang >Priority: Critical > > One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. > And then, other brokers also was shut down because of similar causes. > > Main cause of this problem is '*Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)* > *java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint*' > > I haven't known why this error occurs and how to solve it. Please, give me > some answers or comments about it. Thank you. > And I attached the content of log files such as kafkaServer.out, > log-cleaner.log > > kafkaServer.out > {code:java} > [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 > expired offsets in 0 milliseconds. > (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO > [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 > milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 > 19:56:48,080] ERROR Error while reading checkpoint file > /tmp/kafka-logs/cleaner-offset-checkpoint > (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: > /tmp/kafka-logs/cleaner-offset-checkpoint at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at > sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214) > at java.nio.file.Files.newByteChannel(Files.java:361) at > java.nio.file.Files.newByteChannel(Files.java:407) at > java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384) > at java.nio.file.Files.newInputStream(Files.java:152) at > java.nio.file.Files.newBufferedReader(Files.java:2784) at > java.nio.file.Files.newBufferedReader(Files.java:2816) at > kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87) > at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at > kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61) > at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134) > at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at > scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at > scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at > scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at > scala.collection.immutable.List$.from(List.scala:617) at > scala.collection.immutable.List$.from(List.scala:611) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.immutable.Iterable$.from(Iterable.scala:35) at > scala.collection.immutable.Iterable$.from(Iterable.scala:32) at > scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at > scala.collection.IterableOps.flatMap(Iterable.scala:674) at > scala.collection.IterableOps.flatMap$(Iterable.scala:674) at > scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at > kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132) > at > kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140) > at > kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171) > at > kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168) > at > kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at > kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 > 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir >
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rajinisivaram commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r467052269 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeUserScramCredentialsResult describeUserScramCredentials(List users, DescribeUserScramCredentialsOptions options) { +final KafkaFutureImpl> future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { Review comment: Let's see what @cmccabe thinks. We can do whatever we intend to do for similar describe APIs in the KIP-500 world. If we are changing, then we should update the KIP and send a note to the KIP discussion thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error
azher khan created KAFKA-10373: -- Summary: Kafka Reassign Partition is stuck with Java OutOfMemory error Key: KAFKA-10373 URL: https://issues.apache.org/jira/browse/KAFKA-10373 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.2.1 Reporter: azher khan Hi Team, While trying to run the Kafka script to reassign partitions of an existing topic, we are seeing a Java OutOfMemory issue. The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker. {code:java} /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 --reassignment-json-file topic_kafka_topic1_reassignment.json --bootstrap-server kafkabroker1:9092 --verify Status of partition reassignment: [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread 'kafka-admin-client-thread | reassign-partitions-tool': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116) at java.lang.Thread.run(Thread.java:748) Reassignment of partition kafka_topic1-0 is still in progress Reassignment of partition kafka_topic1-1 is still in progress Reassignment of partition kafka_topic1-2 is still in progress{code} Retried the above command after removing the "reassign_partitions" from zookeeper as suggested but we are seeing the same error. {code:java} [zk: localhost:2181(CONNECTED) 5] delete /admin/reassign_partitions [zk: localhost:2181(CONNECTED) 7] ls /admin [delete_topics] {code} Would highly appreciate your advice, Thank you in advance, Regards, Azher Khan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
[ https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Tibi resolved KAFKA-10372. Resolution: Abandoned error from ansible role which override script. > [JAVA 11] Unrecognized VM option PrintGCDateStamps > -- > > Key: KAFKA-10372 > URL: https://issues.apache.org/jira/browse/KAFKA-10372 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Kevin Tibi >Priority: Blocker > Labels: bug > Fix For: 2.6.1 > > > Hello, > I can't start kafka with JAVA 11. > > {code:java} > kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' > kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. > kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program > will exit.{code} > This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. > Solution : > {code:java} > -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and > PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee updated KAFKA-9366: --- Summary: Upgrade log4j to log4j2 (was: please consider upgrade log4j to log4j2 due to critical security problem CVE-2019-17571) > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
[ https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Tibi updated KAFKA-10372: --- Priority: Blocker (was: Major) > [JAVA 11] Unrecognized VM option PrintGCDateStamps > -- > > Key: KAFKA-10372 > URL: https://issues.apache.org/jira/browse/KAFKA-10372 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Kevin Tibi >Priority: Blocker > Labels: bug > Fix For: 2.6.1 > > > Hello, > I can start kafka with JAVA 11. > > {code:java} > kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' > kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. > kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program > will exit.{code} > This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. > Solution : > {code:java} > -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and > PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
[ https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Tibi updated KAFKA-10372: --- Description: Hello, I can't start kafka with JAVA 11. {code:java} kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program will exit.{code} This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. Solution : {code:java} -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and PrintGCDateStamps{code} was: Hello, I can start kafka with JAVA 11. {code:java} kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program will exit.{code} This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. Solution : {code:java} -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and PrintGCDateStamps{code} > [JAVA 11] Unrecognized VM option PrintGCDateStamps > -- > > Key: KAFKA-10372 > URL: https://issues.apache.org/jira/browse/KAFKA-10372 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Kevin Tibi >Priority: Blocker > Labels: bug > Fix For: 2.6.1 > > > Hello, > I can't start kafka with JAVA 11. > > {code:java} > kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' > kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. > kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program > will exit.{code} > This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. > Solution : > {code:java} > -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and > PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
[ https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Tibi updated KAFKA-10372: --- Fix Version/s: 2.6.1 > [JAVA 11] Unrecognized VM option PrintGCDateStamps > -- > > Key: KAFKA-10372 > URL: https://issues.apache.org/jira/browse/KAFKA-10372 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Kevin Tibi >Priority: Major > Labels: bug > Fix For: 2.6.1 > > > Hello, > I can start kafka with JAVA 11. > > {code:java} > kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' > kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. > kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program > will exit.{code} > This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. > Solution : > {code:java} > -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and > PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
[ https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Tibi updated KAFKA-10372: --- Labels: bug (was: ) > [JAVA 11] Unrecognized VM option PrintGCDateStamps > -- > > Key: KAFKA-10372 > URL: https://issues.apache.org/jira/browse/KAFKA-10372 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Kevin Tibi >Priority: Major > Labels: bug > > Hello, > I can start kafka with JAVA 11. > > {code:java} > kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' > kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. > kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program > will exit.{code} > This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. > Solution : > {code:java} > -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and > PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps
Kevin Tibi created KAFKA-10372: -- Summary: [JAVA 11] Unrecognized VM option PrintGCDateStamps Key: KAFKA-10372 URL: https://issues.apache.org/jira/browse/KAFKA-10372 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.6.0 Reporter: Kevin Tibi Hello, I can start kafka with JAVA 11. {code:java} kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps' kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine. kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program will exit.{code} This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11. Solution : {code:java} -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and PrintGCDateStamps{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
tombentley commented on pull request #9136: URL: https://github.com/apache/kafka/pull/9136#issuecomment-670438137 @omkreddy, @mimaison, @rajinisivaram, @kkonstantine, @dajac thanks for voting on the KIP. I'd be grateful for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
tombentley opened a new pull request #9136: URL: https://github.com/apache/kafka/pull/9136 See KIP-632. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8878: MINOR: Generator config-specific HTML ids
tombentley commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-670404348 Thinking about, it we could avoid breaking existing links by generating both old and new style ids: `whatever.config.name`. WDYT @mimaison @omkreddy? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tom1299 opened a new pull request #9135: MINOR [WIP] Refactor Producer class in examples
tom1299 opened a new pull request #9135: URL: https://github.com/apache/kafka/pull/9135 I'm a newbie to the Kafka project and going through the examples. Along the way I did a very small refactoring to the Producer class: * Convert the Callback class into a method The aim was to simplify the whole class and introduce lambda-style / functional style programming now available in current java versions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #9134: MINOR: set initial capacity for all array buffers in converting produ…
chia7712 opened a new pull request #9134: URL: https://github.com/apache/kafka/pull/9134 this is a bit optimization of converting producer responses. Those array buffers need initial capacity to avoid growing continually if the request carries a bunch of small messages and those messages are sent to different partitions. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.
[ https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-10371: --- Assignee: David Jacot > Partition reassignments can result in crashed ReplicaFetcherThreads. > > > Key: KAFKA-10371 > URL: https://issues.apache.org/jira/browse/KAFKA-10371 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.7.0 >Reporter: Steve Rodrigues >Assignee: David Jacot >Priority: Critical > > A Kafka system doing partition reassignments got stuck with the reassignment > partially done and the system with a non-zero number of URPs and increasing > max lag. > Looking in the logs, we see: > {noformat} > [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Error due to > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for foo > [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] > kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, > fetcherId=0] Stopped > {noformat} > Investigating further and with some helpful changes to the exception (which > was not generating a stack trace because it was a client-side exception), we > see on a test run: > {noformat} > [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while > fetching partition state for topic-test-topic-85 > at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415) > at > kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645) > at > kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672) > at > kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133) > at > kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551) > at scala.collection.AbstractIterable.foreach(Iterable.scala:920) > at > kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309) > {noformat} > It appears that the fetcher is attempting to fetch for a partition that has > been getting reassigned away. From further investigation, it seems that in > KAFKA-10002 the StopReplica code was changed from: > 1. Remove partition from fetcher > 2. Remove partition from partition map > to the other way around, but now the fetcher may race and attempt to build a > fetch for a partition that's no longer mapped. In particular, since the > logOrException code is being called from logStartOffset which isn't protected > against NotLeaderOrFollowerException, just against KafkaStorageException, the > exception isn't caught and throws all the way out, killing the replica > fetcher thread. > We need to switch this back. -- This message was sent by Atlassian Jira (v8.3.4#803005)