[jira] [Commented] (KAFKA-9113) Clean up task management

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020792#comment-17020792
 ] 

ASF GitHub Bot commented on KAFKA-9113:
---

guozhangwang commented on pull request #7997: KAFKA-9113 [WIP]
URL: https://github.com/apache/kafka/pull/7997
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Along KIP-429 we did a lot of refactoring of the task management classes, 
> including the TaskManager and AssignedTasks (and children).  While hopefully 
> easier to reason about there's still significant opportunity for further 
> cleanup including safer state tracking.  Some potential improvements:
> 1) Verify that no tasks are ever in more than one state at once. One 
> possibility is to just check that the suspended, created, restoring, and 
> running maps are all disjoint, but this begs the question of when and where 
> to do those checks, and how often. Another idea might be to put all tasks 
> into a single map and just track their state on a per-task basis. Whatever it 
> is should be aware that some methods are on the critical code path, and 
> should not be burdened with excessive safety checks (ie 
> AssignedStreamTasks#process). Alternatively, it seems to make sense to just 
> make each state its own type. We can then do some cleanup of the AbstractTask 
> and StreamTask classes, which currently contain a number of methods specific 
> to only one type/state of task. For example
>  * only active running tasks ever need to be suspendable, yet every task does 
> through suspend then closeSuspended during close.
>  * as the name suggests, closeSuspended should technically only ever apply to 
> suspended tasks
>  * the code paths needed to perform certain actions such as closing or 
> committing a task vary widely between the different states. A restoring task 
> need only close its state manager, but skipping the task.close call and 
> calling only closeStateManager has lead to confusion and time wasted trying 
> to remember or ask someone why that is sufficient
> 2) Cleanup of closing and/or shutdown logic – there are some potential 
> improvements to be made here as well, for example AssignedTasks currently 
> implements a closeZombieTask method despite the fact that standby tasks are 
> never zombies. 
> 3)  The StoreChangelogReader also interacts with (only) the 
> AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
> reason about these interactions and the state of the changelog reader.
> 4) All 4 classes and their state have very strict consistency requirements 
> that currently are almost impossible to verify, which has already resulted in 
> several bugs that we were lucky to catch in time. We should tighten up how 
> these classes manage their own state, and how the overall state is managed 
> between them, so that it is easy to make changes without introducing new bugs 
> because one class updated its own state without knowing it needed to tell 
> another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-01-21 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020743#comment-17020743
 ] 

Satish Duggana commented on KAFKA-8733:
---

[~flavr] Thanks for letting us know that you are also encountering the same 
issue. There is a discussion thread[1] on dev@kafka mailing list. Waiting for 
others to comment/close the discussion and start the voting mail thread.

1. 
[https://lists.apache.org/thread.html/243dcc267f7ba79f508bcc4cbaa77a41d2454cba9359173bb08e875e%40%3Cdev.kafka.apache.org%3E]

 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9355) RocksDB statistics are removed from JMX when EOS enabled and empty local state dir

2020-01-21 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020733#comment-17020733
 ] 

Bruno Cadonna commented on KAFKA-9355:
--

The cause of the bug is the following. In EOS when the a task is not shutdown 
gracefully (i.e., no checkpoint found, which happens when the local state 
directory is wiped out), the state stores are reinitialized. During 
reinitialization all metrics on state store level are removed when the state 
store is closed on `Metered*Store` level. Afterwards when the store is 
initialised again the RocksDB metrics recorder does not know that the RocksDB 
metrics were removed by `Metered*Store` and does not reinitialize them. The 
above PR fixes this issue. 

> RocksDB statistics are removed from JMX when EOS enabled and empty local 
> state dir
> --
>
> Key: KAFKA-9355
> URL: https://issues.apache.org/jira/browse/KAFKA-9355
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.4.0
>Reporter: Stanislav Savulchik
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: metric-removal.log
>
>
> *Steps to Reproduce*
> Set processing.guarantee = exactly_once and remove local state dir in order 
> to force state restoration from changelog topics that have to be non empty.
> *Expected Behavior*
> There are registered MBeans like 
> kafka.streams:type=stream-state-metrics,client-id=-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=
>  for persistent RocksDB KeyValueStore-s after streams task state restoration.
> *Actual Behavior*
> There are no registered MBeans like above after streams task state 
> restoration.
> *Details*
> I managed to inject custom MetricsReporter in order to log metricChange and 
> metricRemoval calls. According to the logs at some point the missing metrics 
> are removed and never restored later. Here is an excerpt for 
> number-open-files metric:
> {noformat}
> 16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 INFO  o.a.k.s.p.i.StoreChangelogReader - stream-thread 
> [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No 
> checkpoint found for task 0_0 state store buffered-event changelog 
> morpheus.conversion-buffered-event-changelog-0 with EOS turned on. 
> Reinitializing the task and restore its state from the beginning.
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing metrics recorder for store 
> buffered-event of task 0_0 from metrics recording trigger
> 16:33
> 16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> ...
> (no more calls to metricChange for the removed number-open-files 
> metric){noformat}
> Also a complete log is attached [^metric-removal.log]
> Metric removal happens along this call stack:
> {noformat}
> 19:27:35.509 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-9b76f302-7149-47de-b17b-362d642e05d5-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> java.lang.Exception: null
>at 
> casino.morpheus.reporter.LoggingMetricsReporter.metricRemoval(LoggingMetricsReporter.scala:24)
>at 

[jira] [Created] (KAFKA-9463) Transient failure in KafkaAdminClientTest.testListOffsets

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9463:
-

 Summary: Transient failure in KafkaAdminClientTest.testListOffsets
 Key: KAFKA-9463
 URL: https://issues.apache.org/jira/browse/KAFKA-9463
 Project: Kafka
  Issue Type: Test
Reporter: Ted Yu


When running tests with Java 11, I got the following test failure:
{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testListOffsets(KafkaAdminClientTest.java:2336)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
{code}
KafkaAdminClientTest.testListOffsets passes when it is run alone.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9355) RocksDB statistics are removed from JMX when EOS enabled and empty local state dir

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020731#comment-17020731
 ] 

ASF GitHub Bot commented on KAFKA-9355:
---

cadonna commented on pull request #7996: KAFKA-9355: Fix bug that removed 
RocksDB metrics after failure in EOS
URL: https://github.com/apache/kafka/pull/7996
 
 
   - Added `init()` method to `RocksDBMetricsRecorder`
   - Added call to `init()` of `RocksDBMetricsRecorder` to `init()` of RocksDB 
store
   - Added call to `init()` of `RocksDBMetricsRecorder` to `openExisting()` of 
segmented state stores
   - Adapted unit tests
   - Added integration test that reproduces the situation in which the bug 
occurred
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> RocksDB statistics are removed from JMX when EOS enabled and empty local 
> state dir
> --
>
> Key: KAFKA-9355
> URL: https://issues.apache.org/jira/browse/KAFKA-9355
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics, streams
>Affects Versions: 2.4.0
>Reporter: Stanislav Savulchik
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: metric-removal.log
>
>
> *Steps to Reproduce*
> Set processing.guarantee = exactly_once and remove local state dir in order 
> to force state restoration from changelog topics that have to be non empty.
> *Expected Behavior*
> There are registered MBeans like 
> kafka.streams:type=stream-state-metrics,client-id=-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,task-id=0_0,rocksdb-state-id=
>  for persistent RocksDB KeyValueStore-s after streams task state restoration.
> *Actual Behavior*
> There are no registered MBeans like above after streams task state 
> restoration.
> *Details*
> I managed to inject custom MetricsReporter in order to log metricChange and 
> metricRemoval calls. According to the logs at some point the missing metrics 
> are removed and never restored later. Here is an excerpt for 
> number-open-files metric:
> {noformat}
> 16:33:40.403 DEBUG c.m.r.LoggingMetricsReporter - metricChange MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.403 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 INFO  o.a.k.s.p.i.StoreChangelogReader - stream-thread 
> [morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1] No 
> checkpoint found for task 0_0 state store buffered-event changelog 
> morpheus.conversion-buffered-event-changelog-0 with EOS turned on. 
> Reinitializing the task and restore its state from the beginning.
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing statistics for store buffered-event of 
> task 0_0
> 16:33:40.610 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Removing metrics recorder for store 
> buffered-event of task 0_0 from metrics recording trigger
> 16:33
> 16:33:40.611 DEBUG c.m.r.LoggingMetricsReporter - metricRemoval MetricName 
> [name=number-open-files, group=stream-state-metrics, description=Number of 
> currently open files, 
> tags={client-id=morpheus.conversion-678e4b25-8fc7-4266-85a0-7a5fe52a4060-StreamThread-1,
>  task-id=0_0, rocksdb-state-id=buffered-event}]
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding metrics recorder of task 0_0 to metrics 
> recording trigger
> 16:33:40.625 DEBUG o.a.k.s.s.i.m.RocksDBMetricsRecorder - [RocksDB Metrics 
> Recorder for buffered-event] Adding statistics for store buffered-event of 
> task 0_0
> ...
> (no more calls to metricChange for the removed number-open-files 
> metric){noformat}
> Also a complete log is attached [^metric-removal.log]
> Metric removal happens along this call stack:
> {noformat}
> 19:27:35.509 DEBUG 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/22/20 12:56 AM:


[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,

And when zk session expired, zookeeper.getState() is *CONNECTING*.
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 

[jira] [Commented] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020672#comment-17020672
 ] 

ASF GitHub Bot commented on KAFKA-9462:
---

tedyu commented on pull request #7995: KAFKA-9462: Correct exception message in 
DistributedHerder
URL: https://github.com/apache/kafka/pull/7995
 
 
   There are a few exception messages in DistributedHerder which were copied 
from other exception message.
   
   This PR corrects the messages to reflect actual condition
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Correct exception message in DistributedHerder
> --
>
> Key: KAFKA-9462
> URL: https://issues.apache.org/jira/browse/KAFKA-9462
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> There are a few exception messages in DistributedHerder which were copied 
> from other exception message.
> This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9462) Correct exception message in DistributedHerder

2020-01-21 Thread Ted Yu (Jira)
Ted Yu created KAFKA-9462:
-

 Summary: Correct exception message in DistributedHerder
 Key: KAFKA-9462
 URL: https://issues.apache.org/jira/browse/KAFKA-9462
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu


There are a few exception messages in DistributedHerder which were copied from 
other exception message.

This task corrects the messages to reflect actual condition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9459) MM2 sync topic config does not work

2020-01-21 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020660#comment-17020660
 ] 

Badai Aqrandista commented on KAFKA-9459:
-

I think this has been raised before in this comment: 
https://issues.apache.org/jira/browse/KAFKA-7500?focusedCommentId=16931473=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16931473

> MM2 sync topic config does not work
> ---
>
> Key: KAFKA-9459
> URL: https://issues.apache.org/jira/browse/KAFKA-9459
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Badai Aqrandista
>Priority: Major
>
> I have MM2 configured as follow:
> {code:java}
> {
> "name": "mm2-from-1-to-2",
> "config": {
>   
> "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
>   "topics":"foo",
>   "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "sync.topic.configs.enabled":"true",
>   "sync.topic.configs.interval.seconds": 60,
>   "sync.topic.acls.enabled": "false",
>   "replication.factor": 1,
>   "offset-syncs.topic.replication.factor": 1,
>   "heartbeats.topic.replication.factor": 1,
>   "checkpoints.topic.replication.factor": 1,
>   "target.cluster.alias":"dest",
>   "target.cluster.bootstrap.servers":"dest.example.com:9092",
>   "source.cluster.alias":"src",
>   "source.cluster.bootstrap.servers":"src.example.com:9092",
>   "tasks.max": 1}
> }
> {code}
> Topic "foo" is configured with "cleanup.policy=compact". But after waiting 
> for 15 minutes, I still don't see "src.foo" in the destination cluster has 
> "cleanup.policy=compact".
> I had the connect node to run in TRACE level and I could not find any calls 
> to describeConfigs 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
>  This implies it never actually get a list of topics that it needs to get 
> topic configs from.
> And I am suspecting this code always return empty Set 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
> {code:java}
> private Set topicsBeingReplicated() {
> return knownTopicPartitions.stream()
> .map(x -> x.topic())
> .distinct()
> .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
> .collect(Collectors.toSet());
> }
> {code}
> knownTopicPartitions contains topic-partitions from the source cluster.
> knownTargetTopics contains topic-partitions from the target cluster, whose 
> topic names contain source alias already.
> So, why is topicsBeingReplicated (list of topic-partitions from source 
> cluster) being filtered using knownTopicPartitions (list of topic-partitions 
> from target cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9423) Refine layout of configuration options on website and make individual settings directly linkable

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020648#comment-17020648
 ] 

ASF GitHub Bot commented on KAFKA-9423:
---

soenkeliebau commented on pull request #251: KAFKA-9423: Refine layout of 
configuration options on website and make individual settings directly linkable
URL: https://github.com/apache/kafka-site/pull/251
 
 
   This pull request goes together with 
https://github.com/apache/kafka/pull/7955 to achieve the desired effect.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refine layout of configuration options on website and make individual 
> settings directly linkable
> 
>
> Key: KAFKA-9423
> URL: https://issues.apache.org/jira/browse/KAFKA-9423
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Sönke Liebau
>Assignee: Sönke Liebau
>Priority: Trivial
> Attachments: option1.png, option2.png, option3.png, option4.png
>
>
> KAFKA-8474 changed the layout of configuration options on the website from a 
> table which over time ran out of horizontal space to a list.
> This vastly improved readability but is not yet ideal. Further discussion was 
> had in the [PR|https://github.com/apache/kafka/pull/6870] for KAFKA-8474.
> This ticket is to move that discussion to a separate thread and make it more 
> visible to other people and to give subsequent PRs a home.
> Currently proposed options are attached to this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9447) Add examples for EOS standalone and group mode under kafka/examples

2020-01-21 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9447:
---
Description: 
Although we have integration tests for EOS model, it would be best to also put 
them in the examples for people to use.

Also considering the comment here: 
[https://github.com/apache/kafka/pull/7952#discussion_r368313968] it would be 
important for us to utilize the API before jumping into the conclusion what 
would be the best option we have at hand.

  was:Although we have integration tests for EOS model, it would be best to 
also put them in the examples for people to use.


> Add examples for EOS standalone and group mode under kafka/examples
> ---
>
> Key: KAFKA-9447
> URL: https://issues.apache.org/jira/browse/KAFKA-9447
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> Although we have integration tests for EOS model, it would be best to also 
> put them in the examples for people to use.
> Also considering the comment here: 
> [https://github.com/apache/kafka/pull/7952#discussion_r368313968] it would be 
> important for us to utilize the API before jumping into the conclusion what 
> would be the best option we have at hand.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9143) DistributedHerder misleadingly log error on connector task reconfiguration

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9143:
-
Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   2.2.3

> DistributedHerder misleadingly log error on connector task reconfiguration
> --
>
> Key: KAFKA-9143
> URL: https://issues.apache.org/jira/browse/KAFKA-9143
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> In {{DistributedHerder}} in {{reconfigureConnectorTasksWithRetry}} method 
> there's a 
> [callback|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1247]:
> {code:java}
> @Override
> public void onCompletion(Throwable error, Void result) {
> log.error("Unexpected error during connector task reconfiguration: ", 
> error);
> log.error("Task reconfiguration for {} failed unexpectedly, this 
> connector will not be properly reconfigured unless manually triggered.", 
> connName);
> }
> {code}
> It an error message even when the operation succeeded (i.e., {{error}} is 
> {{null}}).
> It should include {{if (error != null)}} condition, like in the same class 
> [in another 
> method|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L792].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9143) DistributedHerder misleadingly log error on connector task reconfiguration

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020558#comment-17020558
 ] 

ASF GitHub Bot commented on KAFKA-9143:
---

rhauch commented on pull request #7648: KAFKA-9143: Log task reconfiguration 
error only when it happened
URL: https://github.com/apache/kafka/pull/7648
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DistributedHerder misleadingly log error on connector task reconfiguration
> --
>
> Key: KAFKA-9143
> URL: https://issues.apache.org/jira/browse/KAFKA-9143
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ivan Yurchenko
>Assignee: Ivan Yurchenko
>Priority: Minor
>
> In {{DistributedHerder}} in {{reconfigureConnectorTasksWithRetry}} method 
> there's a 
> [callback|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1247]:
> {code:java}
> @Override
> public void onCompletion(Throwable error, Void result) {
> log.error("Unexpected error during connector task reconfiguration: ", 
> error);
> log.error("Task reconfiguration for {} failed unexpectedly, this 
> connector will not be properly reconfigured unless manually triggered.", 
> connName);
> }
> {code}
> It an error message even when the operation succeeded (i.e., {{error}} is 
> {{null}}).
> It should include {{if (error != null)}} condition, like in the same class 
> [in another 
> method|https://github.com/apache/kafka/blob/c552c06aed50b4d4d9a85f73ccc89bc06fa7e094/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L792].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9024) org.apache.kafka.connect.transforms.ValueToKey throws NPE

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9024.
--
  Reviewer: Randall Hauch
Resolution: Fixed

Merged to `trunk` and backported to the `2.4`, `2.3`, and `2.2` branches, since 
we typically backport only to the last 2-3 branches.

> org.apache.kafka.connect.transforms.ValueToKey throws NPE
> -
>
> Key: KAFKA-9024
> URL: https://issues.apache.org/jira/browse/KAFKA-9024
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Assignee: Nigel Liang
>Priority: Minor
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> If a field named in the SMT does not exist a NPE is thrown. This is not 
> helpful to users and should be caught correctly and reported back in a more 
> friendly way.
> For example, importing data from a database with this transform: 
>  
> {code:java}
> transforms = [ksqlCreateKey, ksqlExtractString]
> transforms.ksqlCreateKey.fields = [ID]
> transforms.ksqlCreateKey.type = class 
> org.apache.kafka.connect.transforms.ValueToKey
> transforms.ksqlExtractString.field = ID
> transforms.ksqlExtractString.type = class 
> org.apache.kafka.connect.transforms.ExtractField$Key
> {code}
> If the field name is {{id}} not {{ID}} then the task fails : 
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
>at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>at 
> org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
>at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>... 11 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9024) org.apache.kafka.connect.transforms.ValueToKey throws NPE

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9024:
-
Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   2.2.3

> org.apache.kafka.connect.transforms.ValueToKey throws NPE
> -
>
> Key: KAFKA-9024
> URL: https://issues.apache.org/jira/browse/KAFKA-9024
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Assignee: Nigel Liang
>Priority: Minor
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> If a field named in the SMT does not exist a NPE is thrown. This is not 
> helpful to users and should be caught correctly and reported back in a more 
> friendly way.
> For example, importing data from a database with this transform: 
>  
> {code:java}
> transforms = [ksqlCreateKey, ksqlExtractString]
> transforms.ksqlCreateKey.fields = [ID]
> transforms.ksqlCreateKey.type = class 
> org.apache.kafka.connect.transforms.ValueToKey
> transforms.ksqlExtractString.field = ID
> transforms.ksqlExtractString.type = class 
> org.apache.kafka.connect.transforms.ExtractField$Key
> {code}
> If the field name is {{id}} not {{ID}} then the task fails : 
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
>at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>at 
> org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
>at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>... 11 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9024) org.apache.kafka.connect.transforms.ValueToKey throws NPE

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020541#comment-17020541
 ] 

ASF GitHub Bot commented on KAFKA-9024:
---

rhauch commented on pull request #7819: KAFKA-9024: Better error message when 
field specified does not exist
URL: https://github.com/apache/kafka/pull/7819
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> org.apache.kafka.connect.transforms.ValueToKey throws NPE
> -
>
> Key: KAFKA-9024
> URL: https://issues.apache.org/jira/browse/KAFKA-9024
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Assignee: Nigel Liang
>Priority: Minor
>
> If a field named in the SMT does not exist a NPE is thrown. This is not 
> helpful to users and should be caught correctly and reported back in a more 
> friendly way.
> For example, importing data from a database with this transform: 
>  
> {code:java}
> transforms = [ksqlCreateKey, ksqlExtractString]
> transforms.ksqlCreateKey.fields = [ID]
> transforms.ksqlCreateKey.type = class 
> org.apache.kafka.connect.transforms.ValueToKey
> transforms.ksqlExtractString.field = ID
> transforms.ksqlExtractString.type = class 
> org.apache.kafka.connect.transforms.ExtractField$Key
> {code}
> If the field name is {{id}} not {{ID}} then the task fails : 
> {code:java}
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
>at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>at 
> org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
>at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
>at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>... 11 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9083) Various parsing issues in Values class

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9083.
--
  Reviewer: Randall Hauch
Resolution: Fixed

Merged to the `trunk`, `2.4`, `2.3`, and `2.2` branches; we typically don't 
push bugfixes back further than 2-3 branches.

> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
> should be parsed as an empty array)
>  * The returned schema is null when parsing the string {{{}}} (instead, it 
> should be a map schema, possibly with null key and value schemas)
>  * Strings that begin with what appear to be booleans (i.e., the literals 
> {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
> {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be 
> parsed as strings; for example, the string {{true}}} is parsed as the boolean 
> {{true}} but should probably just be parsed as the string {{true}}}
>  * Arrays not containing commas are still parsed as arrays in some cases; for 
> example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
> should arguably be parsed as the string literal {{[0 1 2]}}
>  * An exception is logged when attempting to parse input as a map when it 
> doesn't contain the a final closing brace that states "Map is missing 
> terminating ']'" even though the expected terminating character is actually 
> {{}} and not {{]}}
>  * Finally, and possibly most contentious, escape sequences are not stripped 
> from string literals. Thus, the input string 
> {{foobar]}} is parsed as the literal string 
> {{foobar]}}, which is somewhat unexpected, since 
> that means it is impossible to pass in a string that is parsed as the string 
> literal {{foobar]}}, and it is the job of the caller to handle 
> stripping of such escape sequences. Given that the escape sequence can itself 
> be escaped, it seems better to automatically strip it from parsed string 
> literals before returning them to the user.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread hirik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020534#comment-17020534
 ] 

hirik commented on KAFKA-9458:
--

[~manme...@gmail.com], I looked at the code, if somebody could help me to 
understand the existing flow I can work on this. 

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> 

[jira] [Updated] (KAFKA-9083) Various parsing issues in Values class

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9083:
-
Fix Version/s: 2.4.1
   2.3.2
   2.5.0
   2.2.3

> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
> should be parsed as an empty array)
>  * The returned schema is null when parsing the string {{{}}} (instead, it 
> should be a map schema, possibly with null key and value schemas)
>  * Strings that begin with what appear to be booleans (i.e., the literals 
> {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
> {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be 
> parsed as strings; for example, the string {{true}}} is parsed as the boolean 
> {{true}} but should probably just be parsed as the string {{true}}}
>  * Arrays not containing commas are still parsed as arrays in some cases; for 
> example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
> should arguably be parsed as the string literal {{[0 1 2]}}
>  * An exception is logged when attempting to parse input as a map when it 
> doesn't contain the a final closing brace that states "Map is missing 
> terminating ']'" even though the expected terminating character is actually 
> {{}} and not {{]}}
>  * Finally, and possibly most contentious, escape sequences are not stripped 
> from string literals. Thus, the input string 
> {{foobar]}} is parsed as the literal string 
> {{foobar]}}, which is somewhat unexpected, since 
> that means it is impossible to pass in a string that is parsed as the string 
> literal {{foobar]}}, and it is the job of the caller to handle 
> stripping of such escape sequences. Given that the escape sequence can itself 
> be escaped, it seems better to automatically strip it from parsed string 
> literals before returning them to the user.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9427) StateRestoreListener.onRestoreEnd should report actual message count

2020-01-21 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-9427:


Assignee: Guozhang Wang

> StateRestoreListener.onRestoreEnd should report actual message count
> 
>
> Key: KAFKA-9427
> URL: https://issues.apache.org/jira/browse/KAFKA-9427
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Chris Stromberger
>Assignee: Guozhang Wang
>Priority: Minor
>
> {{StateRestoreListener.onRestoreEnd appears to report the difference between 
> offsets as "totalRestored", which may differ from the actual number of 
> messages restored to a state store}}{{. Am assuming this is due to missing 
> offsets in compacted topics. It would be more useful if 
> }}{{StateRestoreListener.onRestoreEnd}}{{ reported the actual count of 
> messages restored (sum of values reported by 
> }}{{StateRestoreListener.onBatchRestored). }}
> Was asked to create this ticket in Slack thread 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1578956151094200]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9427) StateRestoreListener.onRestoreEnd should report actual message count

2020-01-21 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020529#comment-17020529
 ] 

Guozhang Wang commented on KAFKA-9427:
--

Will do this as part of the KAFKA-9113 cleanup.

> StateRestoreListener.onRestoreEnd should report actual message count
> 
>
> Key: KAFKA-9427
> URL: https://issues.apache.org/jira/browse/KAFKA-9427
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Chris Stromberger
>Assignee: Guozhang Wang
>Priority: Minor
>
> {{StateRestoreListener.onRestoreEnd appears to report the difference between 
> offsets as "totalRestored", which may differ from the actual number of 
> messages restored to a state store}}{{. Am assuming this is due to missing 
> offsets in compacted topics. It would be more useful if 
> }}{{StateRestoreListener.onRestoreEnd}}{{ reported the actual count of 
> messages restored (sum of values reported by 
> }}{{StateRestoreListener.onBatchRestored). }}
> Was asked to create this ticket in Slack thread 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1578956151094200]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7204) MockConsumer poll clears all records in poll(), including records for subscriptions that are paused.

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020521#comment-17020521
 ] 

ASF GitHub Bot commented on KAFKA-7204:
---

guozhangwang commented on pull request #7505: KAFKA-7204: Avoid clearing 
records for paused partitions on poll of MockConsumer
URL: https://github.com/apache/kafka/pull/7505
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MockConsumer poll clears all records in poll(), including records for 
> subscriptions that are paused.
> 
>
> Key: KAFKA-7204
> URL: https://issues.apache.org/jira/browse/KAFKA-7204
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.1.0
>Reporter: Subodh Bhattacharjya
>Assignee: Eduardo Pinto
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9083) Various parsing issues in Values class

2020-01-21 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-9083:


Assignee: Chris Egerton

> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
> should be parsed as an empty array)
>  * The returned schema is null when parsing the string {{{}}} (instead, it 
> should be a map schema, possibly with null key and value schemas)
>  * Strings that begin with what appear to be booleans (i.e., the literals 
> {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
> {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be 
> parsed as strings; for example, the string {{true}}} is parsed as the boolean 
> {{true}} but should probably just be parsed as the string {{true}}}
>  * Arrays not containing commas are still parsed as arrays in some cases; for 
> example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
> should arguably be parsed as the string literal {{[0 1 2]}}
>  * An exception is logged when attempting to parse input as a map when it 
> doesn't contain the a final closing brace that states "Map is missing 
> terminating ']'" even though the expected terminating character is actually 
> {{}} and not {{]}}
>  * Finally, and possibly most contentious, escape sequences are not stripped 
> from string literals. Thus, the input string 
> {{foobar]}} is parsed as the literal string 
> {{foobar]}}, which is somewhat unexpected, since 
> that means it is impossible to pass in a string that is parsed as the string 
> literal {{foobar]}}, and it is the job of the caller to handle 
> stripping of such escape sequences. Given that the escape sequence can itself 
> be escaped, it seems better to automatically strip it from parsed string 
> literals before returning them to the user.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9083) Various parsing issues in Values class

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020510#comment-17020510
 ] 

ASF GitHub Bot commented on KAFKA-9083:
---

rhauch commented on pull request #7593: KAFKA-9083: Various fixes/improvements 
for Connect's Values class
URL: https://github.com/apache/kafka/pull/7593
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Various parsing issues in Values class
> --
>
> Key: KAFKA-9083
> URL: https://issues.apache.org/jira/browse/KAFKA-9083
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> There are a number of small issues with the Connect framework's {{Values}} 
> class that lead to either unexpected exceptions, unintuitive (and arguably 
> incorrect) behavior, or confusing log messages. These include:
>  * A {{NullPointerException}} is thrown when parsing the string {{[null]}} 
> (which should be parsed as an array containing a single null element)
>  * A {{NullPointerException}} is thrown when parsing the string {{[]}} (which 
> should be parsed as an empty array)
>  * The returned schema is null when parsing the string {{{}}} (instead, it 
> should be a map schema, possibly with null key and value schemas)
>  * Strings that begin with what appear to be booleans (i.e., the literals 
> {{true}} or {{false}}) and which are followed by token delimiters (e.g., {{}, 
> {{]}}, {{:}}, etc.) are parsed as booleans when they should arguably be 
> parsed as strings; for example, the string {{true}}} is parsed as the boolean 
> {{true}} but should probably just be parsed as the string {{true}}}
>  * Arrays not containing commas are still parsed as arrays in some cases; for 
> example, the string {{[0 1 2]}} is parsed as the array {{[0, 1, 2]}} when it 
> should arguably be parsed as the string literal {{[0 1 2]}}
>  * An exception is logged when attempting to parse input as a map when it 
> doesn't contain the a final closing brace that states "Map is missing 
> terminating ']'" even though the expected terminating character is actually 
> {{}} and not {{]}}
>  * Finally, and possibly most contentious, escape sequences are not stripped 
> from string literals. Thus, the input string 
> {{foobar]}} is parsed as the literal string 
> {{foobar]}}, which is somewhat unexpected, since 
> that means it is impossible to pass in a string that is parsed as the string 
> literal {{foobar]}}, and it is the job of the caller to handle 
> stripping of such escape sequences. Given that the escape sequence can itself 
> be escaped, it seems better to automatically strip it from parsed string 
> literals before returning them to the user.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing with EOS

2020-01-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020489#comment-17020489
 ] 

Matthias J. Sax commented on KAFKA-9450:


For EOS, we don't write a checkpoint file, and thus we would also not add the 
metadata as a preserved key in the store – hence, it's unclear to me how 
changing where we store the offset-metadata would help for this ticket? This 
tickets says, we don't want to call `innerByteStore#flush` when we call 
`cachingStore#flush` and `changeloggingStore#flush` if EOS is enabled – 
however, stores themselves are agnostic if EOS is enabled and not (what is a 
good thing IMHO). Hence, we can only avoid calling `innerByteStore#flush()` if 
we decouple the caching/changelog/innerBytesStores from each other and the KS 
runtime does not call a single #flush() on the outer metered store that wraps 
all other stores and implicitly flushes all wrapped store, but KS can access 
each store-layer individually and flush them individually as needed.

Or do you suggest to never (ie, for EOS and non-EOS case) call 
`innerByteStore#flush()`? This might be possible, but would have a negative 
impact on non-EOS as it would make current fault-tolerance mechanism for 
non-EOS less efficient (we would not have a guarantee on commit that data is 
flushed to disk and might need to recover more data from the changelog topic in 
case of failure). 

> Decouple inner state flushing from committing with EOS
> --
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9457) Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose

2020-01-21 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-9457.
---
  Reviewer: Manikumar
Resolution: Fixed

> Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose
> -
>
> Key: KAFKA-9457
> URL: https://issues.apache.org/jira/browse/KAFKA-9457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> org.apache.kafka.common.network.SelectorTest.testGracefulClose has been 
> failing a lot in PR builds:
> {{java.lang.AssertionError: expected:<1> but was:<0>}}
> {{ at org.junit.Assert.fail(Assert.java:89)}}
> {{ at org.junit.Assert.failNotEquals(Assert.java:835)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:647)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:633)}}
> {{ at 
> org.apache.kafka.common.network.SelectorTest.testGracefulClose(SelectorTest.java:588)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9457) Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020313#comment-17020313
 ] 

ASF GitHub Bot commented on KAFKA-9457:
---

rajinisivaram commented on pull request #7989: KAFKA-9457; Fix flaky test 
org.apache.kafka.common.network.SelectorTest.testGracefulClose
URL: https://github.com/apache/kafka/pull/7989
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose
> -
>
> Key: KAFKA-9457
> URL: https://issues.apache.org/jira/browse/KAFKA-9457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> org.apache.kafka.common.network.SelectorTest.testGracefulClose has been 
> failing a lot in PR builds:
> {{java.lang.AssertionError: expected:<1> but was:<0>}}
> {{ at org.junit.Assert.fail(Assert.java:89)}}
> {{ at org.junit.Assert.failNotEquals(Assert.java:835)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:647)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:633)}}
> {{ at 
> org.apache.kafka.common.network.SelectorTest.testGracefulClose(SelectorTest.java:588)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Ted Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020262#comment-17020262
 ] 

Ted Yu commented on KAFKA-9461:
---

Since we may not see the complete record given any threshold for size limit, we 
can use hardcoded value when the limit is added.

> Limit DEBUG statement size when logging failed record value
> ---
>
> Key: KAFKA-9461
> URL: https://issues.apache.org/jira/browse/KAFKA-9461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi,
> It is possible with the current implementation that we log a full record 
> content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 
> That stack trace was due to a 70MB messages refused by a broker 
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at 
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuffer.append(StringBuffer.java:270)
> at 
> org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
> at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
> at 
> org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
> at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
> at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
> at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
> at org.apache.log4j.Category.callAppenders(Category.java:206)
> at org.apache.log4j.Category.forcedLog(Category.java:391)
> at org.apache.log4j.Category.log(Category.java:856)
> at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
> at 
> org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
>   in  
> [https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]
> Would it make sense to protect Connect directly in the ConnectRecord 
> toString() method and set a configurable limit ? 
>  Thank you
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Nicolas Guyomar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Guyomar updated KAFKA-9461:
---
Description: 
Hi,

It is possible with the current implementation that we log a full record 
content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 

That stack trace was due to a 70MB messages refused by a broker 
{code:java}
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at 
org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
  in  
[https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]

Would it make sense to protect Connect directly in the ConnectRecord toString() 
method and set a configurable limit ? 

 Thank you

 

 

  was:
Hi,

It is possible with the current implementation that we log a full record 
content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 

That stack trace was due to a 70MB messages refused by a broker

 
{code:java}
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at 
org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
 

 

Would it make sense to protect Connect directly in the ConnectRecord toString() 
method and set a configurable limit ? 

 

Thank you

 

 

 
[https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]

 


> Limit DEBUG statement size when logging failed record value
> ---
>
> Key: KAFKA-9461
> URL: https://issues.apache.org/jira/browse/KAFKA-9461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Nicolas Guyomar
>Priority: Minor
>
> Hi,
> It is possible with the current implementation that we log a full record 
> content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 
> That stack trace was due to a 70MB messages refused by a broker 
> {code:java}
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at 
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuffer.append(StringBuffer.java:270)
> at 
> org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
> at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
> at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
> at 
> 

[jira] [Created] (KAFKA-9461) Limit DEBUG statement size when logging failed record value

2020-01-21 Thread Nicolas Guyomar (Jira)
Nicolas Guyomar created KAFKA-9461:
--

 Summary: Limit DEBUG statement size when logging failed record 
value
 Key: KAFKA-9461
 URL: https://issues.apache.org/jira/browse/KAFKA-9461
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.4.0
Reporter: Nicolas Guyomar


Hi,

It is possible with the current implementation that we log a full record 
content at DEBUG level, which can overwhelmed log4j buffer and OOM it : 

That stack trace was due to a 70MB messages refused by a broker

 
{code:java}
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at 
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuffer.append(StringBuffer.java:270)
at 
org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
at org.apache.log4j.RollingFileAppender.subAppend(RollingFileAppender.java:276)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.debug(Log4jLoggerAdapter.java:252)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask$1.onCompletion(WorkerSourceTask.java:330){code}
 

 

Would it make sense to protect Connect directly in the ConnectRecord toString() 
method and set a configurable limit ? 

 

Thank you

 

 

 
[https://github.com/apache/kafka/blob/da4337271ef0b72643c0cf47ae51e69f79cf1901/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L348]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020218#comment-17020218
 ] 

M. Manna commented on KAFKA-9458:
-

[~hirik] I am not sure if that was the right PR. I also see that someone has 
provided [https://github.com/apache/kafka/pull/6403] in the Comments (around 
September 2019). Bottom line, it's probably better to run Kafka in Docker or 
some Linux env. With Windows, nothing can be tested for certain. I must admit 
that I haven't gone past the verbosity of Kafka's log cleaner logic as it's 
mixed with Scala and Java. You are welcome to investigate and let us know what 
you find.

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at 

[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread hirik (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020215#comment-17020215
 ] 

hirik commented on KAFKA-9458:
--

[~manme...@gmail.com] thanks for the quick reply, I found the below pull 
request 
([https://github.com/apache/kafka/pull/6403/commits/89a36884a3738e925ab4604136054bbf0108447c)|https://github.com/apache/kafka/pull/6403/commits/89a36884a3738e925ab4604136054bbf0108447c]
 from thread 6188 but it is still not integrated. 

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at 

[jira] [Updated] (KAFKA-9460) Enable TLSv1.2 by default and disable all others protocol versions

2020-01-21 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov updated KAFKA-9460:
---
Labels: needs-kip  (was: )

> Enable TLSv1.2 by default and disable all others protocol versions
> --
>
> Key: KAFKA-9460
> URL: https://issues.apache.org/jira/browse/KAFKA-9460
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: needs-kip
>
> In KAFKA-7251 support of TLS1.3 was introduced.
> For now, only TLS1.2 and TLS1.3 are recommended for the usage, other versions 
> of TLS considered as obsolete:
> https://www.rfc-editor.org/info/rfc8446
> https://en.wikipedia.org/wiki/Transport_Layer_Security#History_and_development
> But testing of TLS1.3 incomplete, for now.
> We should enable actual versions of the TLS protocol by default to provide to 
> the users only secure implementations.
> Users can enable obsolete versions of the TLS with the configuration if they 
> want to. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9460) Enable TLSv1.2 by default and disable all others protocol versions

2020-01-21 Thread Nikolay Izhikov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolay Izhikov updated KAFKA-9460:
---
Component/s: security

> Enable TLSv1.2 by default and disable all others protocol versions
> --
>
> Key: KAFKA-9460
> URL: https://issues.apache.org/jira/browse/KAFKA-9460
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
>
> In KAFKA-7251 support of TLS1.3 was introduced.
> For now, only TLS1.2 and TLS1.3 are recommended for the usage, other versions 
> of TLS considered as obsolete:
> https://www.rfc-editor.org/info/rfc8446
> https://en.wikipedia.org/wiki/Transport_Layer_Security#History_and_development
> But testing of TLS1.3 incomplete, for now.
> We should enable actual versions of the TLS protocol by default to provide to 
> the users only secure implementations.
> Users can enable obsolete versions of the TLS with the configuration if they 
> want to. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9460) Enable TLSv1.2 by default and disable all others protocol versions

2020-01-21 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-9460:
--

 Summary: Enable TLSv1.2 by default and disable all others protocol 
versions
 Key: KAFKA-9460
 URL: https://issues.apache.org/jira/browse/KAFKA-9460
 Project: Kafka
  Issue Type: Improvement
Reporter: Nikolay Izhikov
Assignee: Nikolay Izhikov


In KAFKA-7251 support of TLS1.3 was introduced.

For now, only TLS1.2 and TLS1.3 are recommended for the usage, other versions 
of TLS considered as obsolete:

https://www.rfc-editor.org/info/rfc8446
https://en.wikipedia.org/wiki/Transport_Layer_Security#History_and_development
But testing of TLS1.3 incomplete, for now.

We should enable actual versions of the TLS protocol by default to provide to 
the users only secure implementations.

Users can enable obsolete versions of the TLS with the configuration if they 
want to. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9459) MM2 sync topic config does not work

2020-01-21 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-9459:

Description: 
I have MM2 configured as follow:

{code:java}
{
"name": "mm2-from-1-to-2",
"config": {
  
"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "topics":"foo",
  "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "sync.topic.configs.enabled":"true",
  "sync.topic.configs.interval.seconds": 60,
  "sync.topic.acls.enabled": "false",
  "replication.factor": 1,
  "offset-syncs.topic.replication.factor": 1,
  "heartbeats.topic.replication.factor": 1,
  "checkpoints.topic.replication.factor": 1,

  "target.cluster.alias":"dest",
  "target.cluster.bootstrap.servers":"dest.example.com:9092",

  "source.cluster.alias":"src",
  "source.cluster.bootstrap.servers":"src.example.com:9092",

  "tasks.max": 1}
}
{code}

Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 
15 minutes, I still don't see "src.foo" in the destination cluster has 
"cleanup.policy=compact".

I had the connect node to run in TRACE level and I could not find any calls to 
describeConfigs 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
 This implies it never actually get a list of topics that it needs to get topic 
configs from.

And I am suspecting this code always return empty Set 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):


{code:java}
private Set topicsBeingReplicated() {
return knownTopicPartitions.stream()
.map(x -> x.topic())
.distinct()
.filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
.collect(Collectors.toSet());
}
{code}

knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose 
topic names contain source alias already.

So, why is topicsBeingReplicated (list of topic-partitions from source cluster) 
being filtered using knownTopicPartitions (list of topic-partitions from target 
cluster)?

  was:
I have MM2 configured as follow:

{code:java}
{
"name": "mm2-from-1-to-2",
"config": {
  
"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "topics":"foo",
  "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "sync.topic.configs.enabled":"true",
  "sync.topic.configs.interval.seconds": 60,
  "sync.topic.acls.enabled": "false",
  "replication.factor": 1,
  "offset-syncs.topic.replication.factor": 1,
  "heartbeats.topic.replication.factor": 1,
  "checkpoints.topic.replication.factor": 1,

  "target.cluster.alias":"dest",
  "target.cluster.bootstrap.servers":"dest.example.com:9092",

  "source.cluster.alias":"src",
  "source.cluster.bootstrap.servers":"src.example.com:9092",

  "tasks.max": 1}
}
{code}

Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 
15 minutes, I still don't see "src.foo" in the destination cluster has 
"cleanup.policy=compact".

I had the connect node to run in TRACE level and I could not find any calls to 
describeConfigs 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
 This implies it never actually get a list of topics that it needs to get topic 
configs from.

And I am suspecting this code 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):


{code:java}
private Set topicsBeingReplicated() {
return knownTopicPartitions.stream()
.map(x -> x.topic())
.distinct()
.filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
.collect(Collectors.toSet());
}
{code}

knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose 
topic names contain source alias already.

So, why is topicsBeingReplicated (list of topic-partitions from source cluster) 
being filtered using knownTopicPartitions (list of topic-partitions from target 
cluster)?


> MM2 sync topic config does not work
> ---
>
> 

[jira] [Updated] (KAFKA-9459) MM2 sync topic config does not work

2020-01-21 Thread Badai Aqrandista (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Badai Aqrandista updated KAFKA-9459:

Summary: MM2 sync topic config does not work  (was: MM2 sync topic config 
does work)

> MM2 sync topic config does not work
> ---
>
> Key: KAFKA-9459
> URL: https://issues.apache.org/jira/browse/KAFKA-9459
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Badai Aqrandista
>Priority: Major
>
> I have MM2 configured as follow:
> {code:java}
> {
> "name": "mm2-from-1-to-2",
> "config": {
>   
> "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
>   "topics":"foo",
>   "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "sync.topic.configs.enabled":"true",
>   "sync.topic.configs.interval.seconds": 60,
>   "sync.topic.acls.enabled": "false",
>   "replication.factor": 1,
>   "offset-syncs.topic.replication.factor": 1,
>   "heartbeats.topic.replication.factor": 1,
>   "checkpoints.topic.replication.factor": 1,
>   "target.cluster.alias":"dest",
>   "target.cluster.bootstrap.servers":"dest.example.com:9092",
>   "source.cluster.alias":"src",
>   "source.cluster.bootstrap.servers":"src.example.com:9092",
>   "tasks.max": 1}
> }
> {code}
> Topic "foo" is configured with "cleanup.policy=compact". But after waiting 
> for 15 minutes, I still don't see "src.foo" in the destination cluster has 
> "cleanup.policy=compact".
> I had the connect node to run in TRACE level and I could not find any calls 
> to describeConfigs 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
>  This implies it never actually get a list of topics that it needs to get 
> topic configs from.
> And I am suspecting this code 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
> {code:java}
> private Set topicsBeingReplicated() {
> return knownTopicPartitions.stream()
> .map(x -> x.topic())
> .distinct()
> .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
> .collect(Collectors.toSet());
> }
> {code}
> knownTopicPartitions contains topic-partitions from the source cluster.
> knownTargetTopics contains topic-partitions from the target cluster, whose 
> topic names contain source alias already.
> So, why is topicsBeingReplicated (list of topic-partitions from source 
> cluster) being filtered using knownTopicPartitions (list of topic-partitions 
> from target cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9459) MM2 sync topic config does work

2020-01-21 Thread Badai Aqrandista (Jira)
Badai Aqrandista created KAFKA-9459:
---

 Summary: MM2 sync topic config does work
 Key: KAFKA-9459
 URL: https://issues.apache.org/jira/browse/KAFKA-9459
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.4.0
Reporter: Badai Aqrandista


I have MM2 configured as follow:

{code:java}
{
"name": "mm2-from-1-to-2",
"config": {
  
"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
  "topics":"foo",
  "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
  "sync.topic.configs.enabled":"true",
  "sync.topic.configs.interval.seconds": 60,
  "sync.topic.acls.enabled": "false",
  "replication.factor": 1,
  "offset-syncs.topic.replication.factor": 1,
  "heartbeats.topic.replication.factor": 1,
  "checkpoints.topic.replication.factor": 1,

  "target.cluster.alias":"dest",
  "target.cluster.bootstrap.servers":"dest.example.com:9092",

  "source.cluster.alias":"src",
  "source.cluster.bootstrap.servers":"src.example.com:9092",

  "tasks.max": 1}
}
{code}

Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 
15 minutes, I still don't see "src.foo" in the destination cluster has 
"cleanup.policy=compact".

I had the connect node to run in TRACE level and I could not find any calls to 
describeConfigs 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
 This implies it never actually get a list of topics that it needs to get topic 
configs from.

And I am suspecting this code 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):


{code:java}
private Set topicsBeingReplicated() {
return knownTopicPartitions.stream()
.map(x -> x.topic())
.distinct()
.filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
.collect(Collectors.toSet());
}
{code}

knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose 
topic names contain source alias already.

So, why is topicsBeingReplicated (list of topic-partitions from source cluster) 
being filtered using knownTopicPartitions (list of topic-partitions from target 
cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020168#comment-17020168
 ] 

M. Manna commented on KAFKA-9458:
-

[~hirik] - This is only an issue for Windows. Kafka isn't fully compatible with 
Windows and the issue has been there for a while. if you like to fix this issue 
without impacting any platform support, please feel free to do so. Otherwise, 
see comments on the relevant tickets (1194, 6188)

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  

[jira] [Updated] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

M. Manna updated KAFKA-9458:

Priority: Major  (was: Blocker)

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Major
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.ReplicaManager)
> 

[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread M. Manna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020166#comment-17020166
 ] 

M. Manna commented on KAFKA-9458:
-

This is related to Windows's ability to block concurrent change/deletion of 
memory mapped files.

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Blocker
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir 
> 

[jira] [Updated] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread hirik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hirik updated KAFKA-9458:
-
Environment: Windows Server 2019  (was: Windows 10)

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Blocker
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> 

[jira] [Updated] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread hirik (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hirik updated KAFKA-9458:
-
Affects Version/s: 2.4.0

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
> Environment: Windows 10
>Reporter: hirik
>Priority: Blocker
> Attachments: logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
>  ... 27 more
> [2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.ReplicaManager)
> [2020-01-21 

[jira] [Created] (KAFKA-9458) Kafka crashed in windows environment

2020-01-21 Thread hirik (Jira)
hirik created KAFKA-9458:


 Summary: Kafka crashed in windows environment
 Key: KAFKA-9458
 URL: https://issues.apache.org/jira/browse/KAFKA-9458
 Project: Kafka
  Issue Type: Bug
 Environment: Windows 10
Reporter: hirik
 Attachments: logs.zip

Hi,

while I was trying to validate Kafka retention policy, Kafka Server crashed 
with below exception trace. 

[2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] Rolled 
new log segment at offset 1 in 52 ms. (kafka.log.Log)
[2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
(kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
 -> 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
 The process cannot access the file because it is being used by another process.

at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
 at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
 at java.base/java.nio.file.Files.move(Files.java:1425)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
 at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
 at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
 at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
 at scala.collection.immutable.List.foreach(List.scala:305)
 at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
 at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
 at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
 at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
 at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
 at kafka.log.Log.deleteSegments(Log.scala:1691)
 at kafka.log.Log.deleteOldSegments(Log.scala:1686)
 at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
 at kafka.log.Log.deleteOldSegments(Log.scala:1753)
 at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
 at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
 at scala.collection.immutable.List.foreach(List.scala:305)
 at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
 at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
 at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:830)
 Suppressed: java.nio.file.FileSystemException: 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
 -> 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
 The process cannot access the file because it is being used by another process.

at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
 at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
 at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:309)
 at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
 at java.base/java.nio.file.Files.move(Files.java:1425)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:792)
 ... 27 more
[2020-01-21 17:10:40,495] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
(kafka.server.ReplicaManager)
[2020-01-21 17:10:40,495] ERROR Uncaught exception in scheduled task 
'kafka-log-retention' (kafka.utils.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while deleting 
segments for test1-3 in dir 
C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka
Caused by: java.nio.file.FileSystemException: 

[jira] [Commented] (KAFKA-9423) Refine layout of configuration options on website and make individual settings directly linkable

2020-01-21 Thread Nikazu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020158#comment-17020158
 ] 

Nikazu commented on KAFKA-9423:
---

I personally like 2 & 3 the most, i agree, that the current format is a bit 
scattered and it is hard to read, thx guys!

> Refine layout of configuration options on website and make individual 
> settings directly linkable
> 
>
> Key: KAFKA-9423
> URL: https://issues.apache.org/jira/browse/KAFKA-9423
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Sönke Liebau
>Assignee: Sönke Liebau
>Priority: Trivial
> Attachments: option1.png, option2.png, option3.png, option4.png
>
>
> KAFKA-8474 changed the layout of configuration options on the website from a 
> table which over time ran out of horizontal space to a list.
> This vastly improved readability but is not yet ideal. Further discussion was 
> had in the [PR|https://github.com/apache/kafka/pull/6870] for KAFKA-8474.
> This ticket is to move that discussion to a separate thread and make it more 
> visible to other people and to give subsequent PRs a home.
> Currently proposed options are attached to this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9457) Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose

2020-01-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020140#comment-17020140
 ] 

ASF GitHub Bot commented on KAFKA-9457:
---

rajinisivaram commented on pull request #7989: KAFKA-9457; Fix flaky test 
org.apache.kafka.common.network.SelectorTest.testGracefulClose
URL: https://github.com/apache/kafka/pull/7989
 
 
   Test currently assumes that exactly one receive is completed within 1 
second. We cannot guarantee that. Updated to increase timeout to 5 seconds and 
allow one or more pending receives to complete.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose
> -
>
> Key: KAFKA-9457
> URL: https://issues.apache.org/jira/browse/KAFKA-9457
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> org.apache.kafka.common.network.SelectorTest.testGracefulClose has been 
> failing a lot in PR builds:
> {{java.lang.AssertionError: expected:<1> but was:<0>}}
> {{ at org.junit.Assert.fail(Assert.java:89)}}
> {{ at org.junit.Assert.failNotEquals(Assert.java:835)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:647)}}
> {{ at org.junit.Assert.assertEquals(Assert.java:633)}}
> {{ at 
> org.apache.kafka.common.network.SelectorTest.testGracefulClose(SelectorTest.java:588)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9457) Flaky test org.apache.kafka.common.network.SelectorTest.testGracefulClose

2020-01-21 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-9457:
-

 Summary: Flaky test 
org.apache.kafka.common.network.SelectorTest.testGracefulClose
 Key: KAFKA-9457
 URL: https://issues.apache.org/jira/browse/KAFKA-9457
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.5.0


org.apache.kafka.common.network.SelectorTest.testGracefulClose has been failing 
a lot in PR builds:

{{java.lang.AssertionError: expected:<1> but was:<0>}}
{{ at org.junit.Assert.fail(Assert.java:89)}}
{{ at org.junit.Assert.failNotEquals(Assert.java:835)}}
{{ at org.junit.Assert.assertEquals(Assert.java:647)}}
{{ at org.junit.Assert.assertEquals(Assert.java:633)}}
{{ at 
org.apache.kafka.common.network.SelectorTest.testGracefulClose(SelectorTest.java:588)}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-01-21 Thread Flavien Raynaud (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020083#comment-17020083
 ] 

Flavien Raynaud commented on KAFKA-8733:


We've seen offline partitions happening for the same reason in one of our 
clusters too, where only the broker leader for the offline partitions was 
having disk issues. It looks like there has not been much progress/look on the 
PR submitted since December 9th. Is there anything blocking this change from 
moving forward? :)

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:51 AM:
---

[~junrao] I do a further analysis as below:

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can see that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can found that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:49 AM:
---

zk client heartbeat thread org.apache.zookeeper.ClientCnxn$SendThread
{code:java}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(ClientCnxn.makeThreadName("-SendThread()"));
ClientCnxn.this.state = States.CONNECTING;//init state
this.clientCnxnSocket = clientCnxnSocket;
this.setDaemon(true);
}
{code}
As we can see, zk heartbeat thread SendThread init state is *CONNECTING*,
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}
According to above code, we can found that when zk session expired, 
ClientCnxn.SendThread will attempt to reconnect to zk sever and establish a new 
session.
 It's init state is CONNECTING, if zk-session-expired-thread enter to 
reinitialize method with zookeeper state CONNECTING, it will skip if 
*(!connectionState.isAlive)* condition and execute callAfterInitializingSession 
, trigger controller-event-thread process RegisterBrokerAndReelect,  while zk 
session is invalid now.


was (Author: lbdai3190):
{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at 

[jira] [Comment Edited] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17019851#comment-17019851
 ] 

leibo edited comment on KAFKA-8532 at 1/21/20 8:24 AM:
---

[~junrao] 

I'm agree with your opinion "all pending ZK requests should complete with a 
SessionExpired error through the response callback". 

As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded.

RegisterBrokerAndReelect need a established zookeeper session to process finish 
by controller-event-thread,Expired need be process by controller-event-thread 
to established zookeeper session, but controller-event-thread is busy to 
handing RegisterBrokerAndReelect.

 

According to current analysis, I think it's tow ways to deal with this problem:

1.  Use more strict judgment conditions to prevent ControllerEvent  
RegisterBrokerAndReelect  to be execute when zk session expired.

2.  Found out the reason why handleRequest blocked when zookeeper session 
Expired. 


was (Author: lbdai3190):
[~junrao] 

I'm agree with your opinion "all pending ZK requests should complete with a 
SessionExpired error through the response callback". 

As the attachment controllerDeadLockAnalysis-2020-01-20.png I uploaded.

RegisterBrokerAndReelect need a established zookeeper session to process finish 
by controller-event-thread,Expired need be process by controller-event-thread 
to established zookeeper session, but controller-event-thread is busy to 
handing RegisterBrokerAndReelect.

 

According to current analysis, I think it's tow ways to deal with this problem:

1.  Use more strict judgment conditions to prevent ControllerEvent  
RegisterBrokerAndReelect  to be execute when zk session expired.

2.  Found out the reason why handleRequest blocked when zookeeper session 
Expired. 

 

 

 

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at 

[jira] [Commented] (KAFKA-8532) controller-event-thread deadlock with zk-session-expiry-handler0

2020-01-21 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8532?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020027#comment-17020027
 ] 

leibo commented on KAFKA-8532:
--

{code:java}
//代码占位符
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;

public boolean isAlive() {  // when zk client state is CONNECTING 
or  NOT_CONNECTED, obviously, zk session is not valid, we can't regard as 
zookeeper is Alive
return this != CLOSED && this != AUTH_FAILED;
}

/**
 * Returns whether we are connected to a server (which
 * could possibly be read-only, if this client is allowed
 * to go to read-only mode)
 * */
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
{code}

> controller-event-thread deadlock with zk-session-expiry-handler0
> 
>
> Key: KAFKA-8532
> URL: https://issues.apache.org/jira/browse/KAFKA-8532
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Blocker
> Attachments: controllerDeadLockAnalysis-2020-01-20.png, js.log, 
> js0.log, js1.log, js2.log
>
>
> We have observed a serious deadlock between controller-event-thead and 
> zk-session-expirey-handle thread. When this issue occurred, it's only one way 
> to recovery the kafka cluster is restart kafka server. The  follows is the 
> jstack log of controller-event-thead and zk-session-expiry-handle thread.
> "zk-session-expiry-handler0" #163089 daemon prio=5 os_prio=0 
> tid=0x7fcc9c01 nid=0xfb22 waiting on condition [0x7fcbb01f8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005ee3f7000> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) // 
> 等待controller-event-thread线程处理expireEvent
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessingStarted(KafkaController.scala:1533)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:173)
>  at 
> kafka.zookeeper.ZooKeeperClient.callBeforeInitializingSession(ZooKeeperClient.scala:408)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$reinitialize$1$adapted(ZooKeeperClient.scala:374)
>  at kafka.zookeeper.ZooKeeperClient$$Lambda$1473/1823438251.apply(Unknown 
> Source)
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:209)
>  at kafka.zookeeper.ZooKeeperClient.reinitialize(ZooKeeperClient.scala:374)
>  at 
> kafka.zookeeper.ZooKeeperClient.$anonfun$scheduleSessionExpiryHandler$1(ZooKeeperClient.scala:428)
>  at 
> kafka.zookeeper.ZooKeeperClient$$Lambda$1471/701792920.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
>  at kafka.utils.KafkaScheduler$$Lambda$198/1048098469.apply$mcV$sp(Unknown 
> Source)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
>  - <0x000661e8d2e0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "controller-event-thread" #51 prio=5 os_prio=0 tid=0x7fceaeec4000 
> nid=0x310 waiting on condition [0x7fccb55c8000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0005d1be5a00> (a 
> java.util.concurrent.CountDownLatch$Sync)
>