[GitHub] [kafka] huxihx commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-21 Thread GitBox


huxihx commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-662260819


   @rajinisivaram Please review this patch. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-21 Thread Jerry Wei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162450#comment-17162450
 ] 

Jerry Wei commented on KAFKA-10134:
---

[~guozhang] refer to the attached file (consumer5.log.2020-07-22.log), the last 
re-join happened at " 10:52:41.247 [pool-1-thread-1] INFO  
o.a.k.c.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer5, 
groupId=consumerGroupId] (Re-)joining group" because there is one consumer 
trying to join. interesting, this time looks better than before, but still 
spend more than one min.

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-21 Thread Jerry Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jerry Wei updated KAFKA-10134:
--
Attachment: (was: consumer4.log.2020-07-22.log)

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-21 Thread Jerry Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jerry Wei updated KAFKA-10134:
--
Attachment: consumer5.log.2020-07-22.log

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-07-21 Thread Jerry Wei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jerry Wei updated KAFKA-10134:
--
Attachment: consumer4.log.2020-07-22.log

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> 
>
> Key: KAFKA-10134
> URL: https://issues.apache.org/jira/browse/KAFKA-10134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Sean Guo
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world 
> effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen 
> when there is some load(some are long running tasks >30S) there, the CPU will 
> go sky-high. It reads ~700% in our metrics so there should be several threads 
> are in a tight loop. We have several consumer threads consuming from 
> different partitions during the rebalance. This is reproducible in both the 
> new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The 
> difference is that with old eager rebalance rebalance protocol used the high 
> CPU usage will dropped after the rebalance done. But when using cooperative 
> one, it seems the consumers threads are stuck on something and couldn't 
> finish the rebalance so the high CPU usage won't drop until we stopped our 
> load. Also a small load without long running task also won't cause continuous 
> high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 
> cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 runnable  
> [0x7fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 
> os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x7fe11f044000 nid=0x1f4 
> runnable  [0x7fe119aab000]   java.lang.Thread.State: RUNNABLE at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> at
>  
> By debugging into the code we found it looks like the clients are  in a loop 
> on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still 
> exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it 
> seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9343) Add ps command for Kafka and zookeeper process on z/OS.

2020-07-21 Thread Shuo Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Zhang resolved KAFKA-9343.
---

> Add ps command for Kafka and zookeeper process on z/OS.
> ---
>
> Key: KAFKA-9343
> URL: https://issues.apache.org/jira/browse/KAFKA-9343
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Affects Versions: 2.4.0
> Environment: z/OS, OS/390
>Reporter: Shuo Zhang
>Priority: Major
>  Labels: OS/390, z/OS
> Fix For: 2.4.2, 2.5.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> +Note: since the final change scope changed, I changed the summary and 
> description.+ 
> The existing method to check Kafka process for other platform doesn't 
> applicable for z/OS, on z/OS, the best keyword we can use is the JOBNAME. 
> PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 
> '\{print $1}') 
> --> 
> PIDS=$(ps -A -o pid,jobname,comm | grep -i $JOBNAME | grep java | grep -v 
> grep | awk '\{print $1}') 
> So does the zookeeper process.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10269) AdminClient ListOffsetsResultInfo/timestamp is always -1

2020-07-21 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162444#comment-17162444
 ] 

huxihx commented on KAFKA-10269:


[~d-t-w] Thanks for reporting and feel free to take this ticket.

> AdminClient ListOffsetsResultInfo/timestamp is always -1
> 
>
> Key: KAFKA-10269
> URL: https://issues.apache.org/jira/browse/KAFKA-10269
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.5.0
>Reporter: Derek Troy-West
>Priority: Minor
>
> When using AdminClient/listOffsets the resulting ListOffsetResultInfos appear 
> to always have a timestamp of -1.
> I've run listOffsets against live clusters with multiple Kafka versions (from 
> 1.0 to 2.5) with both CreateTIme and LogAppendTime for 
> message.timestamp.type, every result has -1 timestamp.
> e.g. 
> {{org.apache.kafka.clients.admin.ListOffsetsResult$ListOffsetsResultInfo#}}{{0x5c3a771}}
> ListOffsetsResultInfo(} offset=23016, timestamp=-1, 
> {{leaderEpoch=Optional[0])}}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] huxihx opened a new pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-21 Thread GitBox


huxihx opened a new pull request #9051:
URL: https://github.com/apache/kafka/pull/9051


   https://issues.apache.org/jira/browse/KAFKA-10268
   
   Currently, ConfigCommand's `--delete-config` API does not restore the config 
to default value, no matter at broker-level or broker-default level. Besides, 
Admin.incrementalAlterConfigs API also runs into this problem. This patch fixes 
it by removing the corresponding config from the `newConfig` properties when 
reconfiguring dynamic broker config.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeffkbkim opened a new pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-07-21 Thread GitBox


jeffkbkim opened a new pull request #9050:
URL: https://github.com/apache/kafka/pull/9050


   JIRA: https://issues.apache.org/jira/browse/KAFKA-10193
   * add `preempt(): Unit` method for all `ControllerEvent` so that all events 
(and future events) must implement it
   * for events that have callbacks, move the preemption from individual 
methods to `preempt()`
   * add preemption for `ApiPartitionReassignment`and 
`ListPartitionReassignments`
   * add integration tests:
   1. test whether `preempt()` is called when controller shuts down
   2. test whether the events with callbacks have the correct error response 
(`NOT_CONTROLLER`)
   * explicit typing for `ControllerEvent` methods
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-662192375


   @vvcephei Updated this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458483259



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException {
 }
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 @Test
-public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
-final int retries = 2;
+public void 
shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
 final AtomicInteger numberOfCalls = new AtomicInteger(0);
 consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
 @Override
-public synchronized Map endOffsets(final 
Collection partitions) {
+public synchronized Map endOffsets(final 
Collection partitions) {
 numberOfCalls.incrementAndGet();
-throw new TimeoutException();
+throw new TimeoutException("KABOOM!");
 }
 };
+initializeConsumer(0, 0, t1, t2, t3, t4);
+
 streamsConfig = new StreamsConfig(new Properties() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-put(StreamsConfig.RETRIES_CONFIG, retries);
+put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L);
 }
 });
 
-try {
-new GlobalStateManagerImpl(
-new LogContext("mock"),
-topology,
-consumer,
-stateDirectory,
-stateRestoreListener,
-streamsConfig);
-} catch (final StreamsException expected) {
-assertEquals(numberOfCalls.get(), retries);
-}
+stateManager = new GlobalStateManagerImpl(
+new LogContext("mock"),
+time,
+topology,
+consumer,
+stateDirectory,
+stateRestoreListener,
+streamsConfig
+);
+processorContext.setStateManger(stateManager);
+stateManager.setGlobalProcessorContext(processorContext);
+
+final StreamsException expected = assertThrows(
+StreamsException.class,
+() -> stateManager.initialize()
+);
+final Throwable cause = expected.getCause();
+assertThat(cause, instanceOf(TimeoutException.class));
+assertThat(cause.getMessage(), equalTo("KABOOM!"));
+
+assertEquals(numberOfCalls.get(), 1);
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 @Test
-public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
-final int retries = 2;
+public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
 final AtomicInteger numberOfCalls = new AtomicInteger(0);
 consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
 @Override
-public synchronized List partitionsFor(final String 
topic) {
+public synchronized Map endOffsets(final 
Collection partitions) {
+time.sleep(100L);
 numberOfCalls.incrementAndGet();
-throw new TimeoutException();
+throw new TimeoutException("KABOOM!");
 }
 };
+initializeConsumer(0, 0, t1, t2, t3, t4);
+
 streamsConfig = new StreamsConfig(new Properties() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-put(StreamsConfig.RETRIES_CONFIG, retries);
+put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
 }
 });
 
-try {
-new GlobalStateManagerImpl(
-new LogContext("mock"),
-topology,
-consumer,
-stateDirectory,
-stateRestoreListener,
-streamsConfig);
-} catch (final StreamsException expected) {
-assertEquals(numberOfCalls.get(), retries);
-}
+stateManager = new GlobalStateManagerImpl(
+new LogContext("mock"),
+time,
+topology,
+consumer,
+stateDirectory,
+stateRestoreListener,
+streamsConfig
+);
+processorContext.setStateManger(stateManager);
+stateManager.setGlobalProcessorContext(processorContext);
+
+final TimeoutException expected = assertThrows(
+TimeoutException.class,
+() -> stateManager.in

[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458483023



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException {
 }
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 @Test
-public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
-final int retries = 2;
+public void 
shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
 final AtomicInteger numberOfCalls = new AtomicInteger(0);
 consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
 @Override
-public synchronized Map endOffsets(final 
Collection partitions) {
+public synchronized Map endOffsets(final 
Collection partitions) {
 numberOfCalls.incrementAndGet();
-throw new TimeoutException();
+throw new TimeoutException("KABOOM!");
 }
 };
+initializeConsumer(0, 0, t1, t2, t3, t4);
+
 streamsConfig = new StreamsConfig(new Properties() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-put(StreamsConfig.RETRIES_CONFIG, retries);
+put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L);
 }
 });
 
-try {
-new GlobalStateManagerImpl(
-new LogContext("mock"),
-topology,
-consumer,
-stateDirectory,
-stateRestoreListener,
-streamsConfig);
-} catch (final StreamsException expected) {
-assertEquals(numberOfCalls.get(), retries);
-}
+stateManager = new GlobalStateManagerImpl(
+new LogContext("mock"),
+time,
+topology,
+consumer,
+stateDirectory,
+stateRestoreListener,
+streamsConfig
+);
+processorContext.setStateManger(stateManager);
+stateManager.setGlobalProcessorContext(processorContext);
+
+final StreamsException expected = assertThrows(
+StreamsException.class,
+() -> stateManager.initialize()
+);
+final Throwable cause = expected.getCause();
+assertThat(cause, instanceOf(TimeoutException.class));
+assertThat(cause.getMessage(), equalTo("KABOOM!"));
+
+assertEquals(numberOfCalls.get(), 1);
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 @Test
-public void shouldRetryWhenPartitionsForThrowsTimeoutException() {
-final int retries = 2;
+public void shouldRetryAtLeastOnceWhenEndOffsetsThrowsTimeoutException() {
 final AtomicInteger numberOfCalls = new AtomicInteger(0);
 consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
 @Override
-public synchronized List partitionsFor(final String 
topic) {
+public synchronized Map endOffsets(final 
Collection partitions) {
+time.sleep(100L);
 numberOfCalls.incrementAndGet();
-throw new TimeoutException();
+throw new TimeoutException("KABOOM!");
 }
 };
+initializeConsumer(0, 0, t1, t2, t3, t4);
+
 streamsConfig = new StreamsConfig(new Properties() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-put(StreamsConfig.RETRIES_CONFIG, retries);
+put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1L);
 }
 });
 
-try {
-new GlobalStateManagerImpl(
-new LogContext("mock"),
-topology,
-consumer,
-stateDirectory,
-stateRestoreListener,
-streamsConfig);
-} catch (final StreamsException expected) {
-assertEquals(numberOfCalls.get(), retries);
-}
+stateManager = new GlobalStateManagerImpl(
+new LogContext("mock"),
+time,
+topology,
+consumer,
+stateDirectory,
+stateRestoreListener,
+streamsConfig
+);
+processorContext.setStateManger(stateManager);
+stateManager.setGlobalProcessorContext(processorContext);
+
+final TimeoutException expected = assertThrows(
+TimeoutException.class,
+() -> stateManager.in

[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458482846



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException {
 }
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR
 @Test
-public void shouldRetryWhenEndOffsetsThrowsTimeoutException() {
-final int retries = 2;
+public void 
shouldNotRetryWhenEndOffsetsThrowsTimeoutExceptionAndTaskTimeoutIsZero() {
 final AtomicInteger numberOfCalls = new AtomicInteger(0);
 consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) {
 @Override
-public synchronized Map endOffsets(final 
Collection partitions) {
+public synchronized Map endOffsets(final 
Collection partitions) {
 numberOfCalls.incrementAndGet();
-throw new TimeoutException();
+throw new TimeoutException("KABOOM!");
 }
 };
+initializeConsumer(0, 0, t1, t2, t3, t4);
+
 streamsConfig = new StreamsConfig(new Properties() {
 {
 put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
 put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
 put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-put(StreamsConfig.RETRIES_CONFIG, retries);
+put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 0L);
 }
 });
 
-try {
-new GlobalStateManagerImpl(
-new LogContext("mock"),
-topology,
-consumer,
-stateDirectory,
-stateRestoreListener,
-streamsConfig);
-} catch (final StreamsException expected) {
-assertEquals(numberOfCalls.get(), retries);
-}
+stateManager = new GlobalStateManagerImpl(
+new LogContext("mock"),
+time,
+topology,
+consumer,
+stateDirectory,
+stateRestoreListener,
+streamsConfig
+);
+processorContext.setStateManger(stateManager);
+stateManager.setGlobalProcessorContext(processorContext);
+
+final StreamsException expected = assertThrows(
+StreamsException.class,
+() -> stateManager.initialize()
+);
+final Throwable cause = expected.getCause();
+assertThat(cause, instanceOf(TimeoutException.class));
+assertThat(cause.getMessage(), equalTo("KABOOM!"));
+
+assertEquals(numberOfCalls.get(), 1);
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR

Review comment:
   Third TODO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458482803



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -131,11 +135,40 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
 }
 
 final Set changelogTopics = new HashSet<>();
-for (final StateStore stateStore : globalStateStores) {
+
+long deadlineMs = NO_DEADLINE;
+final List storesToInitialize = new 
LinkedList<>(globalStateStores);
+
+while (!storesToInitialize.isEmpty()) {
+// we remove and add back on failure to round-robin through all 
stores
+final StateStore stateStore = storesToInitialize.remove(0);
 globalStoreNames.add(stateStore.name());
 final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
 changelogTopics.add(sourceTopic);
-stateStore.init(globalProcessorContext, stateStore);
+
+try {
+stateStore.init(globalProcessorContext, stateStore);
+deadlineMs = NO_DEADLINE;
+} catch (final RetryableErrorException retryableException) {
+if (taskTimeoutMs == 0L) {
+throw new StreamsException(retryableException.getCause());

Review comment:
   If we rethrow, we get rid of the `RetryableErrorException` and pass in 
the original root cause, ie, the `TimeoutException`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458482408



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -612,72 +617,521 @@ public boolean lockGlobalState() throws IOException {
 }
 }
 
-@SuppressWarnings("deprecation") // TODO revisit in follow up PR

Review comment:
   Second TODO





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458482045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -58,30 +60,33 @@
  * of Global State Stores. There is only ever 1 instance of this class per 
Application Instance.
  */
 public class GlobalStateManagerImpl implements GlobalStateManager {
+private final static long NO_DEADLINE = -1L;
+
 private final Logger log;
+private final Time time;
 private final Consumer globalConsumer;
 private final File baseDir;
 private final StateDirectory stateDirectory;
 private final Set globalStoreNames = new HashSet<>();
 private final FixedOrderMap> globalStores = 
new FixedOrderMap<>();
 private final StateRestoreListener stateRestoreListener;
 private InternalProcessorContext globalProcessorContext;
-private final int retries;
-private final long retryBackoffMs;
 private final Duration pollTime;
+private final long taskTimeoutMs;
 private final Set globalNonPersistentStoresTopics = new 
HashSet<>();
 private final OffsetCheckpoint checkpointFile;
 private final Map checkpointFileCache;
 private final Map storeToChangelogTopic;
 private final List globalStateStores;
 
-@SuppressWarnings("deprecation") // TODO: remove in follow up PR when 
`RETRIES` is removed

Review comment:
   This PR fixed 3 TODOs form the first PR. (The other two are in the test 
-- also added a comment to the original PR that links to this PR as reference.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481794



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/RetryableErrorException.java
##
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public class RetryableErrorException extends StreamsException {

Review comment:
   Added new exception type as requested.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481626



##
File path: docs/upgrade.html
##
@@ -23,8 +23,8 @@ Notable changes in 2
 
 The configuration parameter retries is deprecated for the 
producer, admin, and Kafka Streams clients
 via https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams";>KIP-572.
-You should use the producer's delivery.timeout.ms, 
admin's default.api.timeout.ms, and
-Kafka Streams' new task.timeout.ms parameters instead.
+You should use the producer's delivery.timeout.ms and 
max.block.ms, admin's

Review comment:
   As above.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -357,7 +357,7 @@
 /** {@code commit.interval.ms} */
 @SuppressWarnings("WeakerAccess")
 public static final String COMMIT_INTERVAL_MS_CONFIG = 
"commit.interval.ms";
-private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with 
which to save the position of the processor." +
+private static final String COMMIT_INTERVAL_MS_DOC = "The frequency in 
milliseconds with which to save the position of the processor." +

Review comment:
   Same fixes as in the docs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481521



##
File path: docs/streams/upgrade-guide.html
##
@@ -95,11 +95,12 @@ Streams API
 
 
 
-The configuration parameter retries is deprecated in 
favor of a the new parameter task.timeout.ms.

Review comment:
   Fixed some typos. And added reference to `max.block.ms` config





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481381



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
-60 milliseconds
+60 milliseconds (10 minutes)
   
   state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  task.timeout.ms

Review comment:
   Forgot to add the new config in the first PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481315



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -308,15 +308,10 @@ bootstrap.serversThe replication factor for changelog topics and 
repartition topics created by the application.
 1
   
-  retries

Review comment:
   Forgot to remove `retries` in the first PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458481231



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -203,7 +203,7 @@ bootstrap.serverscommit.interval.ms
 Low
-The frequency with which to save the position 
(offsets in source topics) of tasks.
+The frequency in milliseconds with which to save 
the position (offsets in source topics) of tasks.

Review comment:
   Added a couple of side fixed for the docs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-21 Thread GitBox


ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r458282421



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -454,6 +456,41 @@ public void flush() {
 }
 }
 
+public void flushCache() {
+RuntimeException firstException = null;
+// attempting to flush the stores
+if (!stores.isEmpty()) {
+log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+for (final StateStoreMetadata metadata : stores.values()) {
+final StateStore store = metadata.stateStore;
+
+try {
+// buffer should be flushed to send all records to 
changelog
+if (store instanceof TimeOrderedKeyValueBuffer) {
+store.flush();
+} else if (store instanceof CachedStateStore) {
+((CachedStateStore) store).flushCache();
+}
+log.trace("Flushed cache or buffer {}", store.name());
+} catch (final RuntimeException exception) {
+if (firstException == null) {
+// do NOT wrap the error if it is actually caused by 
Streams itself
+if (exception instanceof StreamsException)
+firstException = exception;
+else
+firstException = new ProcessorStateException(

Review comment:
   nit: can you use braces here?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -520,20 +557,27 @@ void transitionTaskType(final TaskType newType, final 
LogContext logContext) {
 log.debug("Transitioning state manager for {} task {} to {}", oldType, 
taskId, newType);
 }
 
-@Override
-public void checkpoint(final Map writtenOffsets) {
-// first update each state store's current offset, then checkpoint
-// those stores that are only logged and persistent to the checkpoint 
file
+void updateChangelogOffsets(final Map 
writtenOffsets) {
 for (final Map.Entry entry : 
writtenOffsets.entrySet()) {
 final StateStoreMetadata store = findStore(entry.getKey());
 
 if (store != null) {
 store.setOffset(entry.getValue());
 
 log.debug("State store {} updated to written offset {} at 
changelog {}",
-store.stateStore.name(), store.offset, 
store.changelogPartition);
+store.stateStore.name(), store.offset, 
store.changelogPartition);
 }
 }
+}
+
+@Override
+public void checkpoint(final Map writtenOffsets) {
+// first update each state store's current offset, then checkpoint
+// those stores that are only logged and persistent to the checkpoint 
file
+// TODO: we still need to keep it as part of the checkpoint for global 
tasks; this could be
+//   removed though when we consolidate global tasks / state 
managers into this one

Review comment:
   >TODO: we still need to keep it as part of the checkpoint for global 
tasks
   
   Took me a while to realize that "it" refers to the argument here -- can you 
clarify the comment?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -466,15 +458,8 @@ public void postCommit() {
  */
 partitionGroup.clear();

Review comment:
   This is also maybe beyond the scope of this PR, but it seems like 
there's no reason to do things like this anymore. Specifically, today we 
enforce the order `suspend` -> `pre/postCommit` -> `close` where `suspend` only 
closes the topology and we only use the `SUSPENDED` state to enforce that we 
suspended before closing. Why not swap the order so that we `pre/postCommit` -> 
`suspend` -> `close` and then we can move this call from `postCommit` to 
`suspend` where it makes more sense. Thoughts?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -49,6 +61,31 @@
 this.stateDirectory = stateDirectory;
 }
 
+protected void initializeCheckpoint() {
+// we will delete the local checkpoint file after registering the 
state stores and loading them into the
+// state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed
+offsetSnapshotSinceLastFlush = Collections.emptyMap();
+}
+
+/**
+ * The following exceptions maybe thrown from the state manager flushing 
call
+ *
+ * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+

[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-21 Thread GitBox


mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r458382935



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * 
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * 
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+private final String dest;
+private final Consumer sendConsumer;
+private final ByteArrayOutputStream byteArrayOutputStream;
+private final DataOutput output;
+
+public RecordsWriter(String dest, Consumer sendConsumer) {
+this.dest = dest;
+this.sendConsumer = sendConsumer;
+this.byteArrayOutputStream = new ByteArrayOutputStream();
+this.output = new DataOutputStream(this.byteArrayOutputStream);
+}
+
+@Override
+public void writeByte(byte val) {
+writeQuietly(() -> output.writeByte(val));
+}
+
+@Override
+public void writeShort(short val) {
+writeQuietly(() -> output.writeShort(val));
+}
+
+@Override
+public void writeInt(int val) {
+writeQuietly(() -> output.writeInt(val));
+}
+
+@Override
+public void writeLong(long val) {
+writeQuietly(() -> output.writeLong(val));
+
+}
+
+@Override
+public void writeDouble(double val) {
+writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+}
+
+@Override
+public void writeByteArray(byte[] arr) {
+writeQuietly(() -> output.write(arr));
+}
+
+@Override
+public void writeUnsignedVarint(int i) {
+writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+}
+
+@Override
+public void writeByteBuffer(ByteBuffer src) {
+writeQuietly(() -> output.write(src.array(), src.position(), 
src.remaining()));
+}
+
+@FunctionalInterface
+private interface IOExceptionThrowingRunnable {
+void run() throws IOException;
+}
+
+private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+try {
+runnable.run();
+} catch (IOException e) {
+throw new RuntimeException("Writable encountered an IO error", e);
+}
+}
+
+@Override
+public void writeRecords(BaseRecords records) {
+flush();
+sendConsumer.accept(records.toSend(dest));
+}
+
+/**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ *

[GitHub] [kafka] abbccdda commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-21 Thread GitBox


abbccdda commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r458356891



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.network.RequestChannel
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingQueue[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.DEFAULT,
+time,
+false,
+new ApiVersions,
+logContext
+  )
+}
+val threadName = threadNamePrefix match {
+  case None => s"broker-${config.brokerId}-to-controller-send-thread"
+  case Some(name) => 
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+}
+
+new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, 
requestQueue, metadataCache, config,
+  brokerToControllerListene

[GitHub] [kafka] abbccdda commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-21 Thread GitBox


abbccdda commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r458356772



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.network.RequestChannel
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingQueue[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.DEFAULT,
+time,
+false,
+new ApiVersions,
+logContext
+  )
+}
+val threadName = threadNamePrefix match {
+  case None => s"broker-${config.brokerId}-to-controller-send-thread"
+  case Some(name) => 
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+}
+
+new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, 
requestQueue, metadataCache, config,
+  brokerToControllerListene

[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#discussion_r458346124



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -612,6 +612,7 @@ public boolean lockGlobalState() throws IOException {
 }
 }
 
+@SuppressWarnings("deprecation") // TODO revisit in follow up PR

Review comment:
   Addressed via #9047

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -645,6 +646,7 @@ public void 
shouldRetryWhenEndOffsetsThrowsTimeoutException() {
 }
 }
 
+@SuppressWarnings("deprecation") // TODO revisit in follow up PR

Review comment:
   Addressed via #9047





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#discussion_r458345947



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -74,6 +74,7 @@
 private final OffsetCheckpoint checkpointFile;
 private final Map checkpointFileCache;
 
+@SuppressWarnings("deprecation") // TODO: remove in follow up PR when 
`RETRIES` is removed

Review comment:
   Addressed via #9047





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10298) Replace Windows with a proper interface

2020-07-21 Thread John Roesler (Jira)
John Roesler created KAFKA-10298:


 Summary: Replace Windows with a proper interface
 Key: KAFKA-10298
 URL: https://issues.apache.org/jira/browse/KAFKA-10298
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: John Roesler
Assignee: John Roesler


See POC: [https://github.com/apache/kafka/pull/9031]

 

Presently, windowed aggregations in KafkaStreams fall into two categories:
 * Windows
 ** TimeWindows
 ** UnlimitedWindows
 ** JoinWindows
 * SessionWindows

Unfortunately, Windows is an abstract class instead of an interface, and it 
forces some fields onto its implementations. This has led to a number of 
problems over the years, but so far we have been able to live with them.

However, as we consider adding new implementations to this side of the 
hierarchy, the damage will spread. See KIP-450, for example.

We should take the opportunity now to correct the issue by introducing an 
interface and deprecating Windows itself. Then, we can implement new features 
cleanly and maybe remove Windows in the 3.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-7106) Remove segment/segmentInterval from Window definition

2020-07-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7106:

Comment: was deleted

(was: guozhangwang commented on pull request #7670: KAFKA-7106: Not hide the 
stack trace for ApiException
URL: https://github.com/apache/kafka/pull/7670
 
 
   Unit tests passed locally, do not think it has any compatibility breakage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Remove segment/segmentInterval from Window definition
> -
>
> Key: KAFKA-7106
> URL: https://issues.apache.org/jira/browse/KAFKA-7106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> Currently, Window configures segment and segmentInterval properties, but 
> these aren't truly properties of a window in general.
> Rather, they are properties of the particular implementation that we 
> currently have: a segmented store. Therefore, these properties should be 
> moved to configure only that implementation.
>  
> This may be related to KAFKA-4730, since an in-memory window store wouldn't 
> necessarily need to be segmented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-21 Thread GitBox


mjsax merged pull request #8864:
URL: https://github.com/apache/kafka/pull/8864


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-21 Thread GitBox


mjsax commented on pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#issuecomment-662055495


   Thanks @vvcephei -- I don't think we need tickets. I will use this PR as 
reference itself to make sure we address all TODO that are added.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jamiealquiza commented on pull request #1596: KAFKA-1543: Change replication factor during partition map generation

2020-07-21 Thread GitBox


jamiealquiza commented on pull request #1596:
URL: https://github.com/apache/kafka/pull/1596#issuecomment-662051029


   Found this issue while browsing for something else, but fwiw my 
[topicmappr](https://github.com/DataDog/kafka-kit/tree/master/cmd/topicmappr) 
cli tool does exactly this: `topicmappr rebuild --topics  --brokers 
-1 --replication n`. Creates a compatible file that can now be assigned or 
otherwise worked with.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9049: MINOR: fix scala warnings

2020-07-21 Thread GitBox


abbccdda opened a new pull request #9049:
URL: https://github.com/apache/kafka/pull/9049


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10271) Performance regression while fetching a key from a single partition

2020-07-21 Thread Dima R (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dima R updated KAFKA-10271:
---
Summary: Performance regression while fetching a key from a single 
partition  (was: Performance degradation while fetching a key from a single 
partition)

> Performance regression while fetching a key from a single partition
> ---
>
> Key: KAFKA-10271
> URL: https://issues.apache.org/jira/browse/KAFKA-10271
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0, 2.5.1
>Reporter: Dima R
>Assignee: Dima R
>Priority: Major
>  Labels: KAFKA-10030, KAFKA-9445, KIP-562
> Fix For: 2.5.2, 2.6.1
>
> Attachments: 9020.png
>
>
> This is follow-up bug for KAFKA-10030 
> StreamThreadStateStoreProvider excessive loop over calling 
> internalTopologyBuilder.topicGroups(), which is synchronized, thus causing 
> significant performance degradation to the caller, especially when store has 
> many partitions.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine opened a new pull request #9048: WIP: Replace org.reflections with classgraph for class scanning in Connect

2020-07-21 Thread GitBox


kkonstantine opened a new pull request #9048:
URL: https://github.com/apache/kafka/pull/9048


   https://github.com/classgraph/classgraph is a more recent and more actively 
developed classpath scanning library. 
   
   I'll be doing some testing and performance benchmarking before I open this 
PR for general review. Opening as draft atm. 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10297) Don't use deprecated producer config `retries`

2020-07-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-10297:

Description: 
In 2.7.0 release, producer config `retries` gets deprecated via KIP-572.

Connect is still using this config what needs to be fixed (cf 
[https://github.com/apache/kafka/pull/8864/files#r439685920])
{quote}Btw: @hachikuji raise a concern about this issue, too: 
https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531

> I just had one question about the proposal. Using retries=0 in the producer 
> allows the user to have "at-most-once" delivery. This allows the application 
> to implement its own retry logic for example. Do we still have a way to do 
> this once this configuration is gone?

So maybe we need to do some follow up work in the `Producer` to make it work 
for Connect. But I would defer this to the follow up work.

My original though was, that setting `max.deliver.timeout.ms := request 
.timeout.ms` might prevent internal retries. But we need to verify this. It was 
also brought to my attention, that this might not work if the network 
disconnects -- only `retries=0` would prevent to re-open the connection but a 
low timeout would not prevent retries.

In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = 
0` as "no retries" -- maybe we can do a similar thing for the producer?

There is also `max.block.ms` that we should consider. Unfortunately, I am not 
an expert on the `Producer`. But I would like to move forward with KIP-572 for 
now and are happy to help to resolve those questions.
{quote}

  was:
In 2.7.0 release, producer config `retries` gets deprecated via KIP-572.

Connect is still using this config what needs to be fixed.


> Don't use deprecated producer config `retries`
> --
>
> Key: KAFKA-10297
> URL: https://issues.apache.org/jira/browse/KAFKA-10297
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.7.0
>
>
> In 2.7.0 release, producer config `retries` gets deprecated via KIP-572.
> Connect is still using this config what needs to be fixed (cf 
> [https://github.com/apache/kafka/pull/8864/files#r439685920])
> {quote}Btw: @hachikuji raise a concern about this issue, too: 
> https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531
> > I just had one question about the proposal. Using retries=0 in the producer 
> > allows the user to have "at-most-once" delivery. This allows the 
> > application to implement its own retry logic for example. Do we still have 
> > a way to do this once this configuration is gone?
> So maybe we need to do some follow up work in the `Producer` to make it work 
> for Connect. But I would defer this to the follow up work.
> My original though was, that setting `max.deliver.timeout.ms := request 
> .timeout.ms` might prevent internal retries. But we need to verify this. It 
> was also brought to my attention, that this might not work if the network 
> disconnects -- only `retries=0` would prevent to re-open the connection but a 
> low timeout would not prevent retries.
> In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms = 
> 0` as "no retries" -- maybe we can do a similar thing for the producer?
> There is also `max.block.ms` that we should consider. Unfortunately, I am not 
> an expert on the `Producer`. But I would like to move forward with KIP-572 
> for now and are happy to help to resolve those questions.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #8864: KAFKA-9274: Mark `retries` config as deprecated and add new `task.timeout.ms` config

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #8864:
URL: https://github.com/apache/kafka/pull/8864#discussion_r458266432



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
##
@@ -148,6 +148,7 @@ public KafkaStatusBackingStore(Time time, Converter 
converter) {
 this.statusTopic = statusTopic;
 }
 
+@SuppressWarnings("deprecation")

Review comment:
   Btw: @hachikuji raise a concern about this issue, too: 
https://github.com/apache/kafka/pull/8864#pullrequestreview-443424531
   
   > I just had one question about the proposal. Using retries=0 in the 
producer allows the user to have "at-most-once" delivery. This allows the 
application to implement its own retry logic for example. Do we still have a 
way to do this once this configuration is gone?
   
   So maybe we need to do some follow up work in the `Producer` to make it work 
for Connect. But I would defer this to the follow up work.
   
   My original though was, that setting `max.deliver.timeout.ms := request 
.timeout.ms` might prevent internal retries. But we need to verify this. It was 
also brought to my attention, that this might not work if the network 
disconnects -- only `retries=0` would prevent to re-open the connection but a 
low timeout would not prevent retries.
   
   In KIP-572, we proposed for Kafka Streams itself, to treat `task.timeout.ms 
= 0` as "no retries" -- maybe we can do a similar thing for the producer?
   
   There is also `max.block.ms` that we should consider. Unfortunately, I am 
not an expert on the `Producer`. But I would like to move forward with KIP-572 
for now and are happy to help to resolve those questions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#issuecomment-661986912


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android

2020-07-21 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162167#comment-17162167
 ] 

Paulo César edited comment on KAFKA-7516 at 7/21/20, 4:48 PM:
--

Have progress in this issue?


was (Author: pccesar):
Have progress in this issue?
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934]

> Client (Producer and/or Consumer) crashes during initialization on Android
> --
>
> Key: KAFKA-7516
> URL: https://issues.apache.org/jira/browse/KAFKA-7516
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: alex kamenetsky
>Priority: Major
>
> Attempt to incorporate kafka client (both Producer and Consumer) on Android 
> Dalvik fails during initialization stage: Dalvik doesn't support 
> javax.management (JMX).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android

2020-07-21 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162167#comment-17162167
 ] 

Paulo César commented on KAFKA-7516:


Have progress in this issue?
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934]

> Client (Producer and/or Consumer) crashes during initialization on Android
> --
>
> Key: KAFKA-7516
> URL: https://issues.apache.org/jira/browse/KAFKA-7516
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: alex kamenetsky
>Priority: Major
>
> Attempt to incorporate kafka client (both Producer and Consumer) on Android 
> Dalvik fails during initialization stage: Dalvik doesn't support 
> javax.management (JMX).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-661935552


   There is one test failure in 
`kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`
 that is related to the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r458173651



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
 log.info("Restoring state for global store {}", store.name());
 final List topicPartitions = 
topicPartitionsForStore(store);
-Map highWatermarks = null;
 
-int attempts = 0;
-while (highWatermarks == null) {
-try {
-highWatermarks = globalConsumer.endOffsets(topicPartitions);
-} catch (final TimeoutException retryableException) {
-if (++attempts > retries) {
-log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.",
-store.name(),
-retries,
-retryableException);
-throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-retryableException);
-}
-log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-topicPartitions,
-retryBackoffMs,
-attempts,
-retries,
-retryableException);
-Utils.sleep(retryBackoffMs);
-}
+final Map highWatermarks;
+try {
+highWatermarks = globalConsumer.endOffsets(topicPartitions);
+} catch (final TimeoutException retryableException) {
+log.debug(
+"Failed to get end offsets for partitions {}. The broker may 
be transiently unavailable at the moment. Will retry.",
+topicPartitions,
+retryableException

Review comment:
   The varargs version of `debug` does _not_ take a cause at the end. This 
exception will not be logged. You have to use the version that only takes 
`(String, Exception)`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -184,32 +217,21 @@ public void registerStore(final StateStore store, final 
StateRestoreCallback sta
 
 log.info("Restoring state for global store {}", store.name());
 final List topicPartitions = 
topicPartitionsForStore(store);
-Map highWatermarks = null;
 
-int attempts = 0;
-while (highWatermarks == null) {
-try {
-highWatermarks = globalConsumer.endOffsets(topicPartitions);
-} catch (final TimeoutException retryableException) {
-if (++attempts > retries) {
-log.error("Failed to get end offsets for topic partitions 
of global store {} after {} retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.",
-store.name(),
-retries,
-retryableException);
-throw new StreamsException(String.format("Failed to get 
end offsets for topic partitions of global store %s after %d retry attempts. " +
-"You can increase the number of retries via 
configuration parameter `retries`.", store.name(), retries),
-retryableException);
-}
-log.debug("Failed to get end offsets for partitions {}, 
backing off for {} ms to retry (attempt {} of {})",
-topicPartitions,
-retryBackoffMs,
-attempts,
-retries,
-retryableException);
-Utils.sleep(retryBackoffMs);
-}
+final Map highWatermarks;
+try {
+highWatermarks = globalConsumer.endOffsets(topicPartitions);
+} catch (final TimeoutException retryableException) {
+log.debug(
+"Failed to get end offsets for partitions {}. The broker may 
be transiently unavailable at the moment. Will retry.",
+topicPartitions,
+retryableException
+);
+
+// handled in `GlobalStateMangerImpl#initialize()`
+throw retryableException;

Review comment:
   Minor note: it's confusing to track down exceptions when they are 
re-thrown like this, because the stacktrace would only reference L223. Even 
though the

[GitHub] [kafka] vvcephei commented on pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


vvcephei commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-661924465


   Test failures:
   ```
   kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs failed, log 
available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/core/build/reports/testOutput/kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs.test.stdout
   
   kafka.api.PlaintextAdminIntegrationTest > testAlterReplicaLogDirs FAILED
   org.scalatest.exceptions.TestFailedException: only 0 messages are 
produced within timeout after replica movement. Producer future 
Some(Failure(java.util.concurrent.TimeoutException: Timeout after waiting for 
1 ms.))
   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389)
   at org.scalatest.Assertions.fail(Assertions.scala:1091)
   at org.scalatest.Assertions.fail$(Assertions.scala:1087)
   at org.scalatest.Assertions$.fail(Assertions.scala:1389)
   at 
kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs(PlaintextAdminIntegrationTest.scala:289)
   ```
   
   ```
   
org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker
 failed, log available in 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.12/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker.test.stdout
   
   org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest > 
testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker FAILED
   java.lang.NullPointerException
   at 
java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
   at org.reflections.Store.getAllIncluding(Store.java:82)
   at org.reflections.Store.getAll(Store.java:93)
   at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:355)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:340)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
   at 
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
   at 
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
   at 
org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:91)
   at 
org.apache.kafka.connect.util.clusters.WorkerHandle.start(WorkerHandle.java:50)
   at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.addWorker(EmbeddedConnectCluster.java:167)
   at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.startConnect(EmbeddedConnectCluster.java:253)
   at 
org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.start(EmbeddedConnectCluster.java:137)
   at 
org.apache.kafka.connect.integration.SourceConnectorsIntegrationTest.testTopicsAreCreatedWhenAutoCreateTopicsIsEnabledAtTheBroker(SourceConnectorsIntegrationTest.java:104)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #8954: MINOR; Move quota integration tests to using the new quota API.

2020-07-21 Thread GitBox


rajinisivaram merged pull request #8954:
URL: https://github.com/apache/kafka/pull/8954


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8954: MINOR; Move quota integration tests to using the new quota API.

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #8954:
URL: https://github.com/apache/kafka/pull/8954#issuecomment-661906846


   Connect test failure unrelated, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] urbandan commented on pull request #8957: KAFKA-5235: GetOffsetShell: support for multiple topics and consumer configuration override

2020-07-21 Thread GitBox


urbandan commented on pull request #8957:
URL: https://github.com/apache/kafka/pull/8957#issuecomment-661892966


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7025) Android client support

2020-07-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162049#comment-17162049
 ] 

Ismael Juma commented on KAFKA-7025:


[~Pccesar] What Android API do you provide support for? The version 
distribution in May looked like:

!image-2020-07-21-06-42-03-140.png!

[Source: 
https://www.androidauthority.com/android-version-distribution-748439/|https://www.androidauthority.com/android-version-distribution-748439/]

Usage for versions older than Oreo is decreasing, as expected, but they still 
account for slightly under 40% of users.

> Android client support
> --
>
> Key: KAFKA-7025
> URL: https://issues.apache.org/jira/browse/KAFKA-7025
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Martin Vysny
>Priority: Major
> Attachments: image-2020-07-21-06-42-03-140.png
>
>
> Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would 
> make  the compilation fail: com.android.tools.r8.ApiLevelException: 
> MethodHandle.invoke and MethodHandle.invokeExact are only supported starting 
> with Android O (--min-api 26)
>  
> Would it be possible to make the kafka-clients backward compatible with 
> reasonable Android API (say, 4.4.x) please?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-7025) Android client support

2020-07-21 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7025:
---
Attachment: image-2020-07-21-06-42-03-140.png

> Android client support
> --
>
> Key: KAFKA-7025
> URL: https://issues.apache.org/jira/browse/KAFKA-7025
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Martin Vysny
>Priority: Major
> Attachments: image-2020-07-21-06-42-03-140.png
>
>
> Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would 
> make  the compilation fail: com.android.tools.r8.ApiLevelException: 
> MethodHandle.invoke and MethodHandle.invokeExact are only supported starting 
> with Android O (--min-api 26)
>  
> Would it be possible to make the kafka-clients backward compatible with 
> reasonable Android API (say, 4.4.x) please?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #9026: KAFKA-10274; Consistent timeouts in transactions_test

2020-07-21 Thread GitBox


ijuma commented on pull request #9026:
URL: https://github.com/apache/kafka/pull/9026#issuecomment-661865679


   Is this ready to be merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-21 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-661862318


   @junrao Could you run system tests on both trunk and this PR?
   
   the tests which fail frequently on my local are shown below.
   
   1. transactions_test.py #9026
   1. group_mode_transactions_test.py #9026
   1. security_rolling_upgrade_test.py #9021
   1. streams_standby_replica_test.py (I'm digging it)
   
   Also, I have filed tickets for other failed system tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-21 Thread GitBox


serjchebotarev commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r458064223



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
##
@@ -68,52 +92,224 @@ public void after() throws Exception {
 }
 
 @Test
-public void 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-
super.testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic();
+public void shouldNotAllowToResetWhileStreamsIsRunning() {
+final String appID = getTestId() + "-not-reset-during-runtime";

Review comment:
   Done! :smile: 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-21 Thread GitBox


omkreddy commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-661829854


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)

2020-07-21 Thread GitBox


dajac commented on pull request #8968:
URL: https://github.com/apache/kafka/pull/8968#issuecomment-661828919


   @rajinisivaram Thanks for the review. I have updated the PR to address your 
feedback. Regarding the test case that you suggested, I have added test cases 
but I am 100% sure that they cover what you had in mind. Please, let me know if 
I have misunderstood your suggestion.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)

2020-07-21 Thread GitBox


dajac commented on a change in pull request #8968:
URL: https://github.com/apache/kafka/pull/8968#discussion_r458059973



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
##
@@ -30,6 +30,7 @@
 public class CreatePartitionsOptions extends 
AbstractOptions {
 
 private boolean validateOnly = false;
+private boolean retryQuotaViolatedException = true;

Review comment:
   `retryOnQuotaViolation` sounds good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7025) Android client support

2020-07-21 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161978#comment-17161978
 ] 

Paulo César commented on KAFKA-7025:


Have progress in this issue?

> Android client support
> --
>
> Key: KAFKA-7025
> URL: https://issues.apache.org/jira/browse/KAFKA-7025
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Martin Vysny
>Priority: Major
>
> Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would 
> make  the compilation fail: com.android.tools.r8.ApiLevelException: 
> MethodHandle.invoke and MethodHandle.invokeExact are only supported starting 
> with Android O (--min-api 26)
>  
> Would it be possible to make the kafka-clients backward compatible with 
> reasonable Android API (say, 4.4.x) please?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-7025) Android client support

2020-07-21 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paulo César updated KAFKA-7025:
---
Comment: was deleted

(was: Have progress in this issue?
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934])

> Android client support
> --
>
> Key: KAFKA-7025
> URL: https://issues.apache.org/jira/browse/KAFKA-7025
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Martin Vysny
>Priority: Major
>
> Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would 
> make  the compilation fail: com.android.tools.r8.ApiLevelException: 
> MethodHandle.invoke and MethodHandle.invokeExact are only supported starting 
> with Android O (--min-api 26)
>  
> Would it be possible to make the kafka-clients backward compatible with 
> reasonable Android API (say, 4.4.x) please?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7025) Android client support

2020-07-21 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-7025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161979#comment-17161979
 ] 

Paulo César commented on KAFKA-7025:


Have progress in this issue?
 * [|https://issues.apache.org/jira/secure/AddComment!default.jspa?id=13164934]

> Android client support
> --
>
> Key: KAFKA-7025
> URL: https://issues.apache.org/jira/browse/KAFKA-7025
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Martin Vysny
>Priority: Major
>
> Adding kafka-clients:1.0.0 (also 1.0.1 and 1.1.0) to an Android project would 
> make  the compilation fail: com.android.tools.r8.ApiLevelException: 
> MethodHandle.invoke and MethodHandle.invokeExact are only supported starting 
> with Android O (--min-api 26)
>  
> Would it be possible to make the kafka-clients backward compatible with 
> reasonable Android API (say, 4.4.x) please?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy closed pull request #9046: KAFKA-9432:(follow-up) Set `configKeys` to null in `describeConfigs()` to make it backward compatible with older Kafka versions.

2020-07-21 Thread GitBox


omkreddy closed pull request #9046:
URL: https://github.com/apache/kafka/pull/9046


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9046: KAFKA-9432:(follow-up) Set `configKeys` to null in `describeConfigs()` to make it backward compatible with older Kafka versions.

2020-07-21 Thread GitBox


omkreddy commented on pull request #9046:
URL: https://github.com/apache/kafka/pull/9046#issuecomment-661811555


   @rajinisivaram Thanks for the review. Merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8968: KAFKA-10164; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part II, Admin Changes)

2020-07-21 Thread GitBox


rajinisivaram commented on a change in pull request #8968:
URL: https://github.com/apache/kafka/pull/8968#discussion_r457953384



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
##
@@ -48,4 +49,21 @@ public CreatePartitionsOptions validateOnly(boolean 
validateOnly) {
 this.validateOnly = validateOnly;
 return this;
 }
-}
\ No newline at end of file
+
+/**
+ * Set the retry QuotaViolatedException to indicate whether 
QuotaViolatedException

Review comment:
   Several comments refer to QuotaViolatedException which is not an actual 
exception

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsOptions.java
##
@@ -30,6 +30,7 @@
 public class CreatePartitionsOptions extends 
AbstractOptions {
 
 private boolean validateOnly = false;
+private boolean retryQuotaViolatedException = true;

Review comment:
   Since the exception being retried is called 
ThrottlingQuotaExceededException, maybe this should say `retryOnQuotaViolation` 
or something like that without referring to the exception?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-21 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r457821195



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
##
@@ -177,15 +214,33 @@ public static ApiVersionsResponse 
createApiVersionsResponse(
 }
 }
 
+return new ApiVersionsResponse(
+createApiVersionsResponseData(
+throttleTimeMs,
+Errors.NONE,
+apiKeys,
+latestSupportedFeatures,
+finalizedFeatures,
+finalizedFeaturesEpoch));
+}
+
+public static ApiVersionsResponseData createApiVersionsResponseData(

Review comment:
   It calls into couple other helper functions. Let us keep it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-21 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r457986837



##
File path: 
core/src/test/scala/unit/kafka/server/UpdateFinalizedFeaturesTest.scala
##
@@ -0,0 +1,450 @@
+package kafka.server

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-21 Thread GitBox


kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r457809543



##
File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
##
@@ -1214,6 +1214,70 @@ default AlterClientQuotasResult 
alterClientQuotas(Collection entries, 
AlterClientQuotasOptions options);
 
+/**
+ * Describes finalized as well as supported features. By default, the 
request is issued to any
+ * broker, but it can be optionally directed only to the controller via 
DescribeFeaturesOptions
+ * parameter.
+ * 
+ * The following exceptions can be anticipated when calling {@code get()} 
on the future from the
+ * returned {@link DescribeFeaturesResult}:
+ * 
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the describe operation could 
finish.
+ * 
+ * 
+ * @param options   the options to use
+ *
+ * @return  the DescribeFeaturesResult containing the result
+ */
+DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options);
+
+/**
+ * Applies specified updates to finalized features. The API is atomic, 
meaning that if a single
+ * feature update in the request can't succeed on the controller, then 
none of the feature
+ * updates are carried out. This request is issued only to the controller 
since the API is
+ * only served by the controller.
+ * 
+ * The API takes as input a set of FinalizedFeatureUpdate that need to be 
applied. Each such
+ * update specifies the finalized feature to be added or updated or 
deleted, along with the new
+ * max feature version level value.
+ * 
+ * Downgrade of feature version level is not a regular 
operation/intent. It is only allowed
+ * in the controller if the feature update has the allowDowngrade flag set 
- setting this flag
+ * conveys user intent to attempt downgrade of a feature max version 
level. Note that despite
+ * the allowDowngrade flag being set, certain downgrades may be rejected 
by the controller if it
+ * is deemed impossible.
+ * Deletion of a finalized feature version is not a regular 
operation/intent. It is allowed
+ * only if the allowDowngrade flag is set in the feature update, and, if 
the max version level
+ * is set to a value less than 1.
+ * 
+ *
+ * The following exceptions can be anticipated when calling {@code get()} 
on the futures
+ * obtained from the returned {@link UpdateFinalizedFeaturesResult}:
+ * 
+ *   {@link 
org.apache.kafka.common.errors.ClusterAuthorizationException}
+ *   If the authenticated user didn't have alter access to the 
cluster.
+ *   {@link org.apache.kafka.common.errors.InvalidRequestException}
+ *   If the request details are invalid. e.g., a non-existing finalized 
feature is attempted
+ *   to be deleted or downgraded.
+ *   {@link org.apache.kafka.common.errors.TimeoutException}
+ *   If the request timed out before the updates could finish. It cannot 
be guaranteed whether
+ *   the updates succeeded or not.
+ *   {@link 
org.apache.kafka.common.errors.FinalizedFeatureUpdateFailedException}
+ *   If the updates could not be applied on the controller, despite the 
request being valid.
+ *   This may be a temporary problem.
+ * 
+ * 
+ * This operation is supported by brokers with version 2.7.0 or higher.
+
+ * @param featureUpdates   the set of finalized feature updates
+ * @param options  the options to use
+ *
+ * @return the UpdateFinalizedFeaturesResult containing 
the result
+ */
+UpdateFinalizedFeaturesResult updateFinalizedFeatures(

Review comment:
   Done.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/FeatureMetadata.java
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import java.util.Objects;
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.FinalizedVersionRange;
+import org.apache.kafka.common.feature.Supported

[GitHub] [kafka] rajinisivaram commented on pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#issuecomment-661761899


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames

2020-07-21 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10279.

  Reviewer: Manikumar
Resolution: Fixed

> Allow dynamic update of certificates with additional SubjectAltNames
> 
>
> Key: KAFKA-10279
> URL: https://issues.apache.org/jira/browse/KAFKA-10279
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> At the moment, we don't allow dynamic keystore update in brokers if DN and 
> SubjectAltNames don't match exactly. This is to ensure that existing clients 
> and inter-broker communication don't break. Since addition of new entries to 
> SubjectAltNames will not break any authentication, we should allow that and 
> just verify that new SubjectAltNames is a superset of the old one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #9044: KAFKA-10279; Allow dynamic update of certificates with additional SubjectAltNames

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #9044:
URL: https://github.com/apache/kafka/pull/9044#issuecomment-661749475


   @omkreddy Thanks for the review, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9044: KAFKA-10279; Allow dynamic update of certificates with additional SubjectAltNames

2020-07-21 Thread GitBox


rajinisivaram merged pull request #9044:
URL: https://github.com/apache/kafka/pull/9044


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-07-21 Thread GitBox


rajinisivaram commented on pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#issuecomment-661729947


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-07-21 Thread GitBox


rajinisivaram commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r457944837



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java
##
@@ -97,5 +99,13 @@ public MetricConfig recordLevel(Sensor.RecordingLevel 
recordingLevel) {
 return this;
 }
 
+public boolean skipReporting() {

Review comment:
   If we are using a sensor to determine throttle time that is different 
from the one in Selector, we might want to expose  it as a metric anyway. In 
case of a bug, we want to know this metric, not one in Selector. Perhaps we 
could use `connections-accepted` instead of `connections-created` or something 
like that. In any case, `skipReporting` seems odd, so as @dajac  said, using a 
Sensor that is not added to the metrics registry may be an option too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"

2020-07-21 Thread bright.zhou (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161857#comment-17161857
 ] 

bright.zhou commented on KAFKA-9724:


When merge this to Kafka 2.4.x?  Have any plan?

> Consumer wrongly ignores fetched records "since it no longer has valid 
> position"
> 
>
> Key: KAFKA-9724
> URL: https://issues.apache.org/jira/browse/KAFKA-9724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: Oleg Muravskiy
>Assignee: David Arthur
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
> Attachments: consumer.log.xz
>
>
> After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) 
> consumers in a consumer group intermittently stop progressing on assigned 
> partitions, even when there are messages to consume. This is not a permanent 
> condition, as they progress from time to time, but become slower with time, 
> and catch up after restart.
> Here is a sample of 3 consecutive ignored fetches:
> {noformat}
> 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = 
> 538065631, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=16380)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position 
> FetchPosition{offset=538065584, offsetEpoch=Optional[62], 
> currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), 
> epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 
> (id: 3 rack: null)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = 
> 538065727, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=51864)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> {noformat}
> After which consumer makes progress:
> {noformat}
> 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc1

[GitHub] [kafka] mjsax opened a new pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax opened a new pull request #9047:
URL: https://github.com/apache/kafka/pull/9047


- part of KIP-572
- removed the usage of `retries` in `GlobalStateManger`
- instead of retries the new `task.timeout.ms` config is used
   
   Second PR for KIP-572 (cf. #8864)
   
   Call for review @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: remove `retries` for global tasks

2020-07-21 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r457889242



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -523,6 +524,8 @@
 public static final String STATE_DIR_CONFIG = "state.dir";
 private static final String STATE_DIR_DOC = "Directory location for state 
store. This path must be unique for each streams instance sharing the same 
underlying filesystem.";
 
+public static final String TASK_TIMEOUT_MS_CONFIG = "task.timeout.ms";

Review comment:
   This is just added until the first PR is merged to unblock the work on 
this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8037) KTable restore may load bad data

2020-07-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17161796#comment-17161796
 ] 

Matthias J. Sax commented on KAFKA-8037:


I believe that the optimization is still worth to have. That the input topic is 
log-compacted is not a crazy assumption IMHO. And in fact, we have this 
optimization for a long time and I am not aware of any data lost issues due to 
incorrect retention configuration?

I am also less concerned about Consumered/Materialized. Often user might 
specify the same serde in both places but it should be possible to detect this 
case. Hence, instead of a blind check if to we get one or two serdes, we should 
check if we got the same or different serdes.

Similar for serialization exception handling: if the default one is used, we 
_know_ that no corrupted data can be in the store and thus it seems safe to 
just do blind bulk loading – what I obviously don't know is how may people 
might use the default? I also strongly believe that async serdes should be 
declared and anti-pattern. Instead of trying to make it work, it might be more 
reasonable to tell people that it's a bad idea and not supported.

In general, I think it makes sense to allow users to opt-in/opt-out on a per 
table basis via the API. If we need a `useSourceAsChangelog` I am not sure. We 
could also educate people to use `builder.stream().toTable()` – if possible, it 
seems desirable to not increase the API if we don't need to?

For GlobalKTables or actually global-state-store we might want to reconsider 
the design: note that for GlobalKTable not intermediate processing happens. 
Only for global-state-store people can plug-in a custom Processor (which is 
currently not allowed to do any actually processing). Having an extra changelog 
topic to begin with does not really make sense (in contrast to the KTable 
case). However, we could make the API simpler: instead of letting users define 
the "store maintainer Processor" we would always provide it (including for the 
PAPI) – to allow users to preprocess the data, we allow them the plug-in a 
processor between the source and the store-maintainer instead. In the DSL, we 
never plug-in an intermediate processor but only have source+storeMaintainer – 
if we detect this pattern, we know that the data was not modified and we can do 
some optimizations during state restore. For all other cases, we need to do the 
restore as regular processing as suggested by Guozhang.

 

> KTable restore may load bad data
> 
>
> Key: KAFKA-8037
> URL: https://issues.apache.org/jira/browse/KAFKA-8037
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: pull-request-available
>
> If an input topic contains bad data, users can specify a 
> `deserialization.exception.handler` to drop corrupted records on read. 
> However, this mechanism may be by-passed on restore. Assume a 
> `builder.table()` call reads and drops a corrupted record. If the table state 
> is lost and restored from the changelog topic, the corrupted record may be 
> copied into the store, because on restore plain bytes are copied.
> If the KTable is used in a join, an internal `store.get()` call to lookup the 
> record would fail with a deserialization exception if the value part cannot 
> be deserialized.
> GlobalKTables are affected, too (cf. KAFKA-7663 that may allow a fix for 
> GlobalKTable case). It's unclear to me atm, how this issue could be addressed 
> for KTables though.
> Note, that user state stores are not affected, because they always have a 
> dedicated changelog topic (and don't reuse an input topic) and thus the 
> corrupted record would not be written into the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-21 Thread GitBox


showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-661675952


   Thanks @ning2008wisc ! @mimaison , do you have any comments for this PR? 
Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test

2020-07-21 Thread GitBox


showuon commented on pull request #8885:
URL: https://github.com/apache/kafka/pull/8885#issuecomment-661675572


   hi @omkreddy , looks like @hachikuji is not available recently. Do you think 
we still need other people's comment? I think this change should be pretty 
straightforward and safe. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org