[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460331#comment-16460331 ] Amit Sela commented on KAFKA-6684: -- Link to pr#4950 looks like a mistake, ignore. > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Critical > Fix For: 2.0.0 > > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6845) Shrink size of docker image
[ https://issues.apache.org/jira/browse/KAFKA-6845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460313#comment-16460313 ] ASF GitHub Bot commented on KAFKA-6845: --- jayqi opened a new pull request #4951: KAFKA-6845: Shrink size of docker image URL: https://github.com/apache/kafka/pull/4951 Jira issue: https://issues.apache.org/jira/browse/KAFKA-6845 Proposing a very small change to slightly reduce the size of the Docker image: - Adding `--no-cache-dir` flag to all `pip install` commands to prevent caching of build artifacts When building, I'm seeing an ~20 MB reduction in image size. (1.76 GB -> 1.74 GB) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Shrink size of docker image > --- > > Key: KAFKA-6845 > URL: https://issues.apache.org/jira/browse/KAFKA-6845 > Project: Kafka > Issue Type: Improvement >Reporter: Jay Qi >Priority: Trivial > > Proposing a very small change to slightly reduce the size of the Docker image: > * Adding {{--no-cache-dir}} flag to all {{pip install}} commands to prevent > caching of build artifacts > When building, I'm seeing an ~20 MB reduction in image size. (1.76 GB -> 1.74 > GB) > PR coming shortly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6845) Shrink size of docker image
Jay Qi created KAFKA-6845: - Summary: Shrink size of docker image Key: KAFKA-6845 URL: https://issues.apache.org/jira/browse/KAFKA-6845 Project: Kafka Issue Type: Improvement Reporter: Jay Qi Proposing a very small change to slightly reduce the size of the Docker image: * Adding {{--no-cache-dir}} flag to all {{pip install}} commands to prevent caching of build artifacts When building, I'm seeing an ~20 MB reduction in image size. (1.76 GB -> 1.74 GB) PR coming shortly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6844) Race condition between StreamThread and GlobalStreamThread stopping
[ https://issues.apache.org/jira/browse/KAFKA-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460262#comment-16460262 ] Bill Bejeck commented on KAFKA-6844: https://github.com/apache/kafka/pull/4950 > Race condition between StreamThread and GlobalStreamThread stopping > --- > > Key: KAFKA-6844 > URL: https://issues.apache.org/jira/browse/KAFKA-6844 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 1.1.0, 1.0.1 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.0.0 > > > There can be a race condition where shut down is called on a StreamThread > then shut down is called on a GlobalStreamThread, but the StreamThread can be > delayed in shutting down, and the GlobalStreamThread can shutdown first. > > If the StreamThread tries to access a GlobalStateStore before closing the > user can get an exception stating "..{{Store xxx is currently closed "}} > Here's a redacted partial log file showing this process: > {{2018-04-23 12:54:10 [INFO] [logger] DataExportTopology:86 - Closing > streams}} > {{2018-04-23 12:54:10 [INFO] [logger] KafkaStreams:346 - stream-client > [redacted-info] State transition from RUNNING to PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-1] Informed to shut down}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread > [redacted-info-StreamThread-1] State transition from RUNNING to > PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-2] Informed to shut down}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread > [redacted-info-StreamThread-2] State transition from RUNNING to > PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-3] Informed to shut down}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread > [redacted-info-StreamThread-3] State transition from RUNNING to > PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-4] Informed to shut down}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread > [redacted-info-StreamThread-4] State transition from RUNNING to > PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - > global-stream-thread [redacted-info-GlobalStreamThread] State transition from > RUNNING to PENDING_SHUTDOWN}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-4] Shutting down}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-1] Shutting down}} > {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - > global-stream-thread [redacted-info-GlobalStreamThread] Shutting down}} > {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - > global-stream-thread [redacted-info-GlobalStreamThread] State transition from > PENDING_SHUTDOWN to DEAD}} > {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - > global-stream-thread [redacted-info-GlobalStreamThread] Shutdown complete}} > {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread > [redacted-info-StreamThread-2] Shutting down}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460261#comment-16460261 ] ASF GitHub Bot commented on KAFKA-6684: --- bbejeck opened a new pull request #4950: KAFKA-6684: Call shutdown on all GlobalStreamThread after all StreamThreads have stopped URL: https://github.com/apache/kafka/pull/4950 Moved the shutdown of `GlobalStreamThread` to after all `StreamThread` instances have stopped. There can be a race condition where shut down is called on a `StreamThread` then shut down is called on a GlobalStreamThread, but if StreamThread is delayed in shutting down, and the GlobalStreamThread can shutdown first. If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..Store xxx is currently closed " Tested by running all current streams tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Critical > Fix For: 2.0.0 > > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6844) Race condition between StreamThread and GlobalStreamThread stopping
[ https://issues.apache.org/jira/browse/KAFKA-6844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6844: --- Description: There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but the StreamThread can be delayed in shutting down, and the GlobalStreamThread can shutdown first. If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..{{Store xxx is currently closed "}} Here's a redacted partial log file showing this process: {{2018-04-23 12:54:10 [INFO] [logger] DataExportTopology:86 - Closing streams}} {{2018-04-23 12:54:10 [INFO] [logger] KafkaStreams:346 - stream-client [redacted-info] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-1] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-2] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-3] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-3] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-4] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - global-stream-thread [redacted-info-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-4] Shutting down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-1] Shutting down}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - global-stream-thread [redacted-info-GlobalStreamThread] Shutting down}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - global-stream-thread [redacted-info-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - global-stream-thread [redacted-info-GlobalStreamThread] Shutdown complete}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-2] Shutting down}} was: There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but the StreamThread can be delayed in shutting down, and the GlobalStreamThread can shutdown first. If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..{{Store xxx is currently closed "}} Here's a redacted partial log file showing this process: {{2018-04-23 12:54:10 [INFO] [logger] DataExportTopology:86 - Closing streams}} {{2018-04-23 12:54:10 [INFO] [logger] KafkaStreams:346 - stream-client [redacted-info] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-1] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-2] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-3] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-3] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-4] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - global-stream-thread [redacted-info-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [lo
[jira] [Created] (KAFKA-6844) Race condition between StreamThread and GlobalStreamThread stopping
Bill Bejeck created KAFKA-6844: -- Summary: Race condition between StreamThread and GlobalStreamThread stopping Key: KAFKA-6844 URL: https://issues.apache.org/jira/browse/KAFKA-6844 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.1, 1.1.0, 1.0.0 Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 2.0.0 There can be a race condition where shut down is called on a StreamThread then shut down is called on a GlobalStreamThread, but the StreamThread can be delayed in shutting down, and the GlobalStreamThread can shutdown first. If the StreamThread tries to access a GlobalStateStore before closing the user can get an exception stating "..{{Store xxx is currently closed "}} Here's a redacted partial log file showing this process: {{2018-04-23 12:54:10 [INFO] [logger] DataExportTopology:86 - Closing streams}} {{2018-04-23 12:54:10 [INFO] [logger] KafkaStreams:346 - stream-client [redacted-info] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-1] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-2] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-3] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-3] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-4] Informed to shut down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:346 - stream-thread [redacted-info-StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - global-stream-thread [redacted-info-GlobalStreamThread] State transition from RUNNING to PENDING_SHUTDOWN}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-4] Shutting down}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-1] Shutting down}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - global-stream-thread [redacted-info-GlobalStreamThread] Shutting down}} {{2018-04-23 12:54:10 [WARN] [logger] StatsDReporter:121 - KafkaStatsDReporter is disabled}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:346 - global-stream-thread [redacted-info-GlobalStreamThread] State transition from PENDING_SHUTDOWN to DEAD}} {{2018-04-23 12:54:10 [INFO] [logger] GlobalStreamThread:336 - global-stream-thread [redacted-info-GlobalStreamThread] Shutdown complete}} {{2018-04-23 12:54:10 [INFO] [logger] StreamThread:336 - stream-thread [redacted-info-StreamThread-2] Shutting down}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460142#comment-16460142 ] Wouter Bancken edited comment on KAFKA-6817 at 5/1/18 8:56 PM: --- We seem to be running into a similar issue. We are using one Kafka Producer which is a long-running bean with idempotence enabled. The exception that we are getting is {code} kafka-producer-network-thread | producer-1] ERROR apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition company.company-alias-0 at offset -1. This indicates data loss on the broker, and should be investigated. Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. {code} We have no knowledge of issues on the broker that might have caused any data loss. We are using Kafka 1.0.0 was (Author: wouterbanckenaca): We seem to be running into a similar issue. We are using one Kafka Producer which is a long-running bean with idempotence enabled. The exception that we are getting is {code} kafka-producer-network-thread | producer-1] ERROR apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition company.company-alias-0 at offset -1. This indicates data loss on the broker, and should be investigated. Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. {code} We have no knowledge of issues on the broker that might have caused any data loss. > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollec
[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps
[ https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460142#comment-16460142 ] Wouter Bancken commented on KAFKA-6817: --- We seem to be running into a similar issue. We are using one Kafka Producer which is a long-running bean with idempotence enabled. The exception that we are getting is {code} kafka-producer-network-thread | producer-1] ERROR apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition company.company-alias-0 at offset -1. This indicates data loss on the broker, and should be investigated. Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. {code} We have no knowledge of issues on the broker that might have caused any data loss. > UnknownProducerIdException when writing messages with old timestamps > > > Key: KAFKA-6817 > URL: https://issues.apache.org/jira/browse/KAFKA-6817 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.1.0 >Reporter: Odin Standal >Priority: Major > > We are seeing the following exception in our Kafka application: > {code:java} > ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer > due to the following error: org.apache.kafka.streams.errors.StreamsException: > task [0_0] Abort sending since an error caught with a previous record (key > 22 value some-value timestamp 1519200902670) to topic > exactly-once-test-topic- v2 due to This exception is raised by the broker if > it could not locate the producer metadata associated with the producerId in > question. This could happen if, for instance, the producer's records were > deleted because their retention time had elapsed. Once the last records of > the producerId are removed, the producer's metadata is removed from the > broker, and future appends by the producer will return this exception. at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at > java.lang.Thread.run(Thread.java:748) Caused by: > org.apache.kafka.common.errors.UnknownProducerIdException > {code} > We discovered this error when we had the need to reprocess old messages. See > more details on > [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-s
[jira] [Created] (KAFKA-6843) Document issue with DNS TTL
David Glasser created KAFKA-6843: Summary: Document issue with DNS TTL Key: KAFKA-6843 URL: https://issues.apache.org/jira/browse/KAFKA-6843 Project: Kafka Issue Type: Bug Reporter: David Glasser We run Kafka and Zookeeper in Google Kubernetes Engine. We have recently had problems where our brokers had serious problems when GKE replaced our cluster (cycling both Zookeeper and Kafka in parallel). Kafka (1.0) brokers lost the ability the talk to Zookeeper, and eventually failed their controlled shutdown, leading to slow startup times for the new broker and outages for our system. We eventually tracked this down to the fact that (at least in our environment) the default JVM DNS caching behavior is to cache results forever. We rely on DNS to connect to Zookeeper, and the DNS resolution changes when the Zookeeper pods are replaced. The fix is straightforward: setting the property networkaddress.cache.ttl or sun.net.inetaddr.ttl to make the caching non-infinite (or use a "security manager"). See [https://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html] for details. I think this gotcha should be documented. Probably at [https://kafka.apache.org/11/documentation/#java] ? I'm happy to submit a PR if people agree this is the right place. (I suppose somehow fixing this in code would be nice too.) By the way, if you search the Apache issue tracker for [networkaddress.cache.ttl|https://issues.apache.org/jira/browse/JAMES-774?jql=text%20~%20%22%5C%22networkaddress.cache.ttl%5C%22%22], you'll learn that this is a common issue faced by many Apache Java projects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460076#comment-16460076 ] Milind Jain commented on KAFKA-6520: Hi, I tried reproducing the issue with /kafka/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java On stopping the broker when WordCountDemo is still running, I am not able to see any error in the WordCountDemo, It keeps running as if the broker is still alive. [~mjsax] [~mwkohout] What should I do next. I am able to reproduce the issue. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Milind Jain >Priority: Major > Labels: newbie, user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460076#comment-16460076 ] Milind Jain edited comment on KAFKA-6520 at 5/1/18 8:05 PM: Hi, I tried reproducing the issue with /kafka/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java On stopping the broker when WordCountDemo is still running, I am not able to see any error in the WordCountDemo, It keeps running as if the broker is still alive. [~mjsax] [~mwkohout] What should I do next. I am able to reproduce the issue? was (Author: milindjain): Hi, I tried reproducing the issue with /kafka/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java On stopping the broker when WordCountDemo is still running, I am not able to see any error in the WordCountDemo, It keeps running as if the broker is still alive. [~mjsax] [~mwkohout] What should I do next. I am able to reproduce the issue. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Milind Jain >Priority: Major > Labels: newbie, user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6842) initTransactions hangs when trying to connect to non-existing broker
[ https://issues.apache.org/jira/browse/KAFKA-6842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Gavrilov updated KAFKA-6842: -- Description: When I specify a non-existing broker as a bootstrap server, 'initTransactions' hangs forever. Here is a simple code to reproduce an issue: {code:java} Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "example.com:49092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test_transactional_id"); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer()); producer.initTransactions(); {code} The trace log file: {noformat} 979 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 979 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 981 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1, transactionalId=test_transactional_id] Kafka producer started 982 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Transition from state UNINITIALIZED to INITIALIZING 982 [main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] ProducerId set to -1 with epoch -1 1147 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) 1147 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) dequeued for sending 1148 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test_transactional_id, coordinatorType=TRANSACTION) 1148 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) 1149 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Request (type=FindCoordinatorRequest, coordinatorKey=test_transactional_id, coordinatorType=TRANSACTION) dequeued for sending 1149 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 1150 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Initiating connection to node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4300 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4351 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack
[jira] [Created] (KAFKA-6842) initTransactions hangs when trying to connect to non-existing broker
Alexander Gavrilov created KAFKA-6842: - Summary: initTransactions hangs when trying to connect to non-existing broker Key: KAFKA-6842 URL: https://issues.apache.org/jira/browse/KAFKA-6842 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 1.1.0 Reporter: Alexander Gavrilov When I specify a non-existing broker as a bootstrap server, 'initTransactions' hangs forever. Here is a simple code to reproduce an issue: {code:java} Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "example.com:49092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000); props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test_transactional_id"); Producer producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer()); producer.initTransactions(); {code} The trace log file: {noformat} 979 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 1.1.0 979 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fdcf75ea326b8e07 981 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1, transactionalId=test_transactional_id] Kafka producer started 982 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Transition from state UNINITIALIZED to INITIALIZING 982 [main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] ProducerId set to -1 with epoch -1 1147 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) 1147 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) dequeued for sending 1148 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test_transactional_id, coordinatorType=TRANSACTION) 1148 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test_transactional_id, transactionTimeoutMs=6) 1149 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=test_transactional_id] Request (type=FindCoordinatorRequest, coordinatorKey=test_transactional_id, coordinatorType=TRANSACTION) dequeued for sending 1149 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 1150 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Initiating connection to node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4250 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4300 [kafka-producer-network-thread | producer-1] TRACE org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1, transactionalId=test_transactional_id] Found least loaded node example.com:49092 (id: -1 rack: null) 4351 [ka
[jira] [Commented] (KAFKA-6839) ZK session retry with cname record
[ https://issues.apache.org/jira/browse/KAFKA-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459913#comment-16459913 ] Tyler Monahan commented on KAFKA-6839: -- Interesting I wasn't aware of that setting. I modified the setting and I will see if the issue is resolved the next time my elb dns records change. > ZK session retry with cname record > -- > > Key: KAFKA-6839 > URL: https://issues.apache.org/jira/browse/KAFKA-6839 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Tyler Monahan >Priority: Major > > I have a 3 node kafka cluster setup in aws that talks to a 3 node zk cluster > behind an elb. I am giving the kafka instances a dns cname record that points > to the aws elb which is another cname record pointing to two A records. When > the aws elb cname record changes the two A records it is pointing at and > kafka trys to reconnect to zk after losing a session it uses the old A > records and not the new ones so the reconnect attempt fails. There appears to > be some kind of caching instead of using the record that is set in the config > file. > This is the error message I am seeing in the broker logs. > {code:java} > [2018-04-30 20:09:21,449] INFO Opening socket connection to server > ip-10-65-68-244.us-west-2.compute.internal/10.65.68.244:2181. Will not > attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:24,450] WARN Client session timed out, have not heard from > server in 3962ms for sessionid 0x263094512190001 > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:24,451] INFO Client session timed out, have not heard from > server in 3962ms for sessionid 0x263094512190001, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:26,532] INFO Opening socket connection to server > ip-10-65-84-102.us-west-2.compute.internal/10.65.84.102:2181. Will not > attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:29,531] WARN Session 0x263094512190001 for server null, > unexpected error, closing socket connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn) > java.net.NoRouteToHostException: No route to host > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-3417) Invalid characters in config properties not being validated?
[ https://issues.apache.org/jira/browse/KAFKA-3417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-3417. -- Resolution: Fixed Assignee: Mickael Maison (was: Grant Henke) Fix Version/s: 2.0.0 > Invalid characters in config properties not being validated? > > > Key: KAFKA-3417 > URL: https://issues.apache.org/jira/browse/KAFKA-3417 > Project: Kafka > Issue Type: Bug > Components: config >Affects Versions: 0.9.0.1 >Reporter: Byron Ruth >Assignee: Mickael Maison >Priority: Minor > Fix For: 2.0.0 > > > I ran into an error using a {{client.id}} with invalid characters (per the > [config > validator|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/common/Config.scala#L25-L35]). > I was able to get that exact error using the {{kafka-console-consumer}} > script, presumably because I supplied a consumer properties file and it > validated prior to hitting the server. However, when I use a client library > (sarama for Go in this case), an error in the metrics subsystem is thrown > [here|https://github.com/apache/kafka/blob/977ebbe9bafb6c1a6e1be69620f745712118fe80/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java#L380]. > The stacktrace is: > {code:title=stack.java} > [2016-03-17 17:43:47,342] ERROR [KafkaApi-0] error when handling request > Name: FetchRequest; Version: 0; CorrelationId: 2; ClientId: foo:bar; > ReplicaId: -1; MaxWait: 250 ms; MinBytes: 1 bytes; RequestInfo: [foo,0] -> > PartitionFetchInfo(0,32768) (kafka.server.KafkaApis) > org.apache.kafka.common.KafkaException: Error creating mbean attribute for > metricName :MetricName [name=throttle-time, group=Fetch, description=Tracking > average throttle-time per client, tags={client-id=foo:bar}] > at > org.apache.kafka.common.metrics.JmxReporter.addAttribute(JmxReporter.java:113) > at > org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:76) > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162) > ... > {code} > Assuming the cause os related to the invalid characters, when the request > header is decoded, the {{clientId}} should be validated prior to being used? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6738) Kafka Connect handling of bad data
[ https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459820#comment-16459820 ] Konstantine Karantasis commented on KAFKA-6738: --- Yes, [~rsaez], this issue represents the work you describe. I'm assigning it to [~wicknicks] who will be working on all the related pieces, including the KIP and the implementation. > Kafka Connect handling of bad data > -- > > Key: KAFKA-6738 > URL: https://issues.apache.org/jira/browse/KAFKA-6738 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Konstantine Karantasis >Priority: Critical > Fix For: 2.0.0 > > > Kafka Connect connectors and tasks fail when they run into an unexpected > situation or error, but the framework should provide more general "bad data > handling" options, including (perhaps among others): > # fail fast, which is what we do today (assuming connector actually fails and > doesn't eat errors) > # retry (possibly with configs to limit) > # drop data and move on > # dead letter queue > This needs to be addressed in a way that handles errors from: > # The connector itself (e.g. connectivity issues to the other system) > # Converters/serializers (bad data, unexpected format, etc) > # SMTs > # Ideally the framework as well, though we obviously want to fix known bugs > anyway -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6738) Kafka Connect handling of bad data
[ https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis reassigned KAFKA-6738: - Assignee: Arjun Satish (was: Konstantine Karantasis) > Kafka Connect handling of bad data > -- > > Key: KAFKA-6738 > URL: https://issues.apache.org/jira/browse/KAFKA-6738 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Arjun Satish >Priority: Critical > Fix For: 2.0.0 > > > Kafka Connect connectors and tasks fail when they run into an unexpected > situation or error, but the framework should provide more general "bad data > handling" options, including (perhaps among others): > # fail fast, which is what we do today (assuming connector actually fails and > doesn't eat errors) > # retry (possibly with configs to limit) > # drop data and move on > # dead letter queue > This needs to be addressed in a way that handles errors from: > # The connector itself (e.g. connectivity issues to the other system) > # Converters/serializers (bad data, unexpected format, etc) > # SMTs > # Ideally the framework as well, though we obviously want to fix known bugs > anyway -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6738) Kafka Connect handling of bad data
[ https://issues.apache.org/jira/browse/KAFKA-6738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-6738: -- Fix Version/s: 2.0.0 > Kafka Connect handling of bad data > -- > > Key: KAFKA-6738 > URL: https://issues.apache.org/jira/browse/KAFKA-6738 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Randall Hauch >Assignee: Konstantine Karantasis >Priority: Critical > Fix For: 2.0.0 > > > Kafka Connect connectors and tasks fail when they run into an unexpected > situation or error, but the framework should provide more general "bad data > handling" options, including (perhaps among others): > # fail fast, which is what we do today (assuming connector actually fails and > doesn't eat errors) > # retry (possibly with configs to limit) > # drop data and move on > # dead letter queue > This needs to be addressed in a way that handles errors from: > # The connector itself (e.g. connectivity issues to the other system) > # Converters/serializers (bad data, unexpected format, etc) > # SMTs > # Ideally the framework as well, though we obviously want to fix known bugs > anyway -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3143) inconsistent state in ZK when all replicas are dead
[ https://issues.apache.org/jira/browse/KAFKA-3143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459817#comment-16459817 ] Manikumar commented on KAFKA-3143: -- >From 1.1.0 (KAFKA-5083), last ISR is preserved in ZK, irrespective of unclean >leader election is enabled or not . > inconsistent state in ZK when all replicas are dead > --- > > Key: KAFKA-3143 > URL: https://issues.apache.org/jira/browse/KAFKA-3143 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Ismael Juma >Priority: Major > Labels: reliability > Fix For: 2.0.0 > > > This issue can be recreated in the following steps. > 1. Start 3 brokers, 1, 2 and 3. > 2. Create a topic with a single partition and 2 replicas, say on broker 1 and > 2. > If we stop both replicas 1 and 2, depending on where the controller is, the > leader and isr stored in ZK in the end are different. > If the controller is on broker 3, what's stored in ZK will be -1 for leader > and an empty set for ISR. > On the other hand, if the controller is on broker 2 and we stop broker 1 > followed by broker 2, what's stored in ZK will be 2 for leader and 2 for ISR. > The issue is that in the first case, the controller will call > ReplicaStateMachine to transition to OfflineReplica, which will change the > leader and isr. However, in the second case, the controller fails over, but > we don't transition ReplicaStateMachine to OfflineReplica during controller > initialization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Pillitteri resolved KAFKA-5896. Resolution: Unresolved > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Pillitteri closed KAFKA-5896. -- > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459799#comment-16459799 ] ASF GitHub Bot commented on KAFKA-5896: --- 56quarters closed pull request #3876: KAFKA-5896: Force Connect tasks to stop via thread interruption URL: https://github.com/apache/kafka/pull/3876 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index c6e2e173834..b9f96128f6e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** @@ -83,6 +84,9 @@ private final ConcurrentMap connectors = new ConcurrentHashMap<>(); private final ConcurrentMap tasks = new ConcurrentHashMap<>(); +private final ConcurrentMap> futures = new ConcurrentHashMap<>(); +private final Object taskAndFutureLock = new Object(); + private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; public Worker( @@ -414,11 +418,14 @@ public boolean startTask( return false; } -WorkerTask existing = tasks.putIfAbsent(id, workerTask); -if (existing != null) -throw new ConnectException("Task already exists in this worker: " + id); +synchronized (taskAndFutureLock) { +WorkerTask existing = tasks.putIfAbsent(id, workerTask); +if (existing != null) +throw new ConnectException("Task already exists in this worker: " + id); + +futures.put(id, executor.submit(workerTask)); +} -executor.submit(workerTask); if (workerTask instanceof WorkerSourceTask) { sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask); } @@ -483,18 +490,41 @@ private void stopTasks(Collection ids) { } private void awaitStopTask(ConnectorTaskId taskId, long timeout) { -WorkerTask task = tasks.remove(taskId); +WorkerTask task; +Future future; + +synchronized (taskAndFutureLock) { +task = tasks.remove(taskId); +future = futures.remove(taskId); +} + if (task == null) { log.warn("Ignoring await stop request for non-present task {}", taskId); return; } if (!task.awaitStop(timeout)) { -log.error("Graceful stop of task {} failed.", task.id()); +log.error("Graceful stop of task {} failed. Cancelling and forcibly interrupting.", task.id()); task.cancel(); + +if (future == null) { +log.warn("No associated Future found for task {}", taskId); +return; +} + +// Interrupt the thread that the task is running in since it hasn't stopped on its +// own by this point. This prevents scenarios where a task runs indefinitely because +// it's blocked on something (lock, network I/O, etc.). +future.cancel(true); } } +// Visible for testing +boolean isTaskFutureRunning(ConnectorTaskId taskId) { +final Future future = futures.get(taskId); +return future != null && !future.isDone(); +} + private void awaitStopTasks(Collection ids) { long now = time.milliseconds(); long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 80c65df7ff4..361cec77a94 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; @@ -57,14 +59,18 @@ import org.powermock.modules.junit4.PowerMockRunner;
[jira] [Commented] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.
[ https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459776#comment-16459776 ] Allen Xiang commented on KAFKA-4701: KAFKA-6810 kind of works. But here is the catch. It requires you to have a new password/trust store path to do a reload. What if we added some new certs to the same trust store, and did not change password? A restart is still needed to do the reload? > Allow kafka brokers to dynamically reload truststore without restarting. > > > Key: KAFKA-4701 > URL: https://issues.apache.org/jira/browse/KAFKA-4701 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Allen Xiang >Priority: Major > Labels: security > Fix For: 2.0.0 > > > Right now in order to add SSL clients(update broker truststores), a rolling > restart of all brokers is required. This is very time consuming and > unnecessary. A dynamic truststore manager is needed to reload truststore from > file system without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6841) Add support for wildcard suffixed ACLs
Piyush Vijay created KAFKA-6841: --- Summary: Add support for wildcard suffixed ACLs Key: KAFKA-6841 URL: https://issues.apache.org/jira/browse/KAFKA-6841 Project: Kafka Issue Type: New Feature Components: admin, security Reporter: Piyush Vijay Fix For: 1.0.2, 1.1.1 Kafka supports authorize access to resources like topics, consumer groups etc. by way of ACLs. The current supported semantic of resource name and principal name in ACL definition is either full resource/principal name or special wildcard '**'*, which matches everything. Kafka should support a way of defining bulk ACLs instead of specifying individual ACLs. The details for the feature are available here - [https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+wildcard+suffixed+ACLs] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.
[ https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459756#comment-16459756 ] Manikumar commented on KAFKA-4701: -- Can this be resolved as duplicate of KAFKA-6810? > Allow kafka brokers to dynamically reload truststore without restarting. > > > Key: KAFKA-4701 > URL: https://issues.apache.org/jira/browse/KAFKA-4701 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Allen Xiang >Priority: Major > Labels: security > Fix For: 2.0.0 > > > Right now in order to add SSL clients(update broker truststores), a rolling > restart of all brokers is required. This is very time consuming and > unnecessary. A dynamic truststore manager is needed to reload truststore from > file system without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6826) Avoid range scans when forwarding values in window store aggregations
[ https://issues.apache.org/jira/browse/KAFKA-6826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6826. -- Resolution: Fixed Fix Version/s: 2.0.0 > Avoid range scans when forwarding values in window store aggregations > - > > Key: KAFKA-6826 > URL: https://issues.apache.org/jira/browse/KAFKA-6826 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté >Priority: Critical > Fix For: 2.0.0 > > Attachments: Screen Shot 2018-04-25 at 11.14.25 AM.png > > > This is a follow-up to KAFKA-6560, where we missed at least one case that > should be using single point queries instead of range-scans when forwarding > values during aggregation. > Since a single range scan can sometimes account for 75% of aggregation cpu > time, fixing this should provide some significant speedups (see attached > flamegraph) > !Screen Shot 2018-04-25 at 11.14.25 AM.png|width=797! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6526. --- Resolution: Fixed Reviewer: Dong Lin > Update controller to handle changes to unclean.leader.election.enable > - > > Key: KAFKA-6526 > URL: https://issues.apache.org/jira/browse/KAFKA-6526 > Project: Kafka > Issue Type: Improvement >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > At the moment, updates to default unclean.leader.election.enable uses the > same code path as updates to topic overrides. This requires controller change > for the new value to take effect. It will be good if we can update the > controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459715#comment-16459715 ] ASF GitHub Bot commented on KAFKA-6526: --- rajinisivaram closed pull request #4920: KAFKA-6526: Enable unclean leader election without controller change URL: https://github.com/apache/kafka/pull/4920 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ControllerState.scala b/core/src/main/scala/kafka/controller/ControllerState.scala index 17af7775901..d2473058ac7 100644 --- a/core/src/main/scala/kafka/controller/ControllerState.scala +++ b/core/src/main/scala/kafka/controller/ControllerState.scala @@ -90,7 +90,11 @@ object ControllerState { def value = 12 } + case object UncleanLeaderElectionEnable extends ControllerState { +def value = 13 + } + val values: Seq[ControllerState] = Seq(Idle, ControllerChange, BrokerChange, TopicChange, TopicDeletion, PartitionReassignment, AutoLeaderBalance, ManualLeaderBalance, ControlledShutdown, IsrChange, LeaderAndIsrResponseReceived, -LogDirChange, ControllerShutdown) +LogDirChange, ControllerShutdown, UncleanLeaderElectionEnable) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index eee625ef663..bc721e39f96 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -195,6 +195,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti zkClient.updateBrokerInfoInZk(newBrokerInfo) } + private[kafka] def enableDefaultUncleanLeaderElection(): Unit = { +eventManager.put(UncleanLeaderElectionEnable) + } + private def state: ControllerState = eventManager.state /** @@ -1009,6 +1013,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } + case object UncleanLeaderElectionEnable extends ControllerEvent { + +def state = ControllerState.UncleanLeaderElectionEnable + +override def process(): Unit = { + if (!isActive) return + partitionStateMachine.triggerOnlinePartitionStateChange() +} + } + case class ControlledShutdown(id: Int, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { def state = ControllerState.ControlledShutdown diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index be0ed6b1999..004b531da1b 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -156,7 +156,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging addBrokerReconfigurable(new DynamicThreadPool(kafkaServer)) if (kafkaServer.logManager.cleaner != null) addBrokerReconfigurable(kafkaServer.logManager.cleaner) -addReconfigurable(new DynamicLogConfig(kafkaServer.logManager)) +addReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer)) addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer)) addReconfigurable(new DynamicClientQuotaCallback(kafkaConfig.brokerId, kafkaServer)) addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer)) @@ -501,7 +501,7 @@ object DynamicLogConfig { val ReconfigurableConfigs = LogConfig.TopicConfigSynonyms.values.toSet -- ExcludedConfigs val KafkaConfigToLogConfigName = LogConfig.TopicConfigSynonyms.map { case (k, v) => (v, k) } } -class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Logging { +class DynamicLogConfig(logManager: LogManager, server: KafkaServer) extends Reconfigurable with Logging { override def configure(configs: util.Map[String, _]): Unit = {} @@ -517,6 +517,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi override def reconfigure(configs: util.Map[String, _]): Unit = { val currentLogConfig = logManager.currentDefaultConfig +val origUncleanLeaderElectionEnable = logManager.currentDefaultConfig.uncleanLeaderElectionEnable val newBrokerDefaults = new util.HashMap[String, Object](currentLogConfig.originals) configs.asScala.filterKeys(DynamicLogConfig.ReconfigurableConfigs.contains).foreach { case (k, v) => if (v != null) { @@ -536,6 +537,9 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi val logConfig = LogConfig(props.asJava) log.updateConfig(newBrokerDefaults.asScala.keySe
[jira] [Updated] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed
[ https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Valentina Baljak updated KAFKA-6188: Affects Version/s: 1.0.1 > Broker fails with FATAL Shutdown - log dirs have failed > --- > > Key: KAFKA-6188 > URL: https://issues.apache.org/jira/browse/KAFKA-6188 > Project: Kafka > Issue Type: Bug > Components: clients, log >Affects Versions: 1.0.0, 1.0.1 > Environment: Windows 10 >Reporter: Valentina Baljak >Priority: Blocker > Labels: windows > > Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The > test environment is very simple, with only one producer and one consumer. > Initially, everything started fine, stand alone tests worked as expected. > However, running my code, Kafka clients fail after approximately 10 minutes. > Kafka won't start after that and it fails with the same error. > Deleting logs helps to start again, and the same problem occurs. > Here is the error traceback: > [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 > ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of > 9223372036854775807 ms. (kafka.log.LogManager) > [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. > (kafka.network.Acceptor) > [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor > threads (kafka.network.SocketServer) > [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting > (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) > [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting > (kafka.server.ReplicaManager$LogDirFailureHandler) > [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving > replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs > (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions are > offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs > (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed > fetcher for partitions (kafka.server.ReplicaFetcherManager) > [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped > fetcher for partitions because they are in the failed log dir > C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager) > [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir > C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager) > [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in > C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6839) ZK session retry with cname record
[ https://issues.apache.org/jira/browse/KAFKA-6839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16459559#comment-16459559 ] Edoardo Comar commented on KAFKA-6839: -- Java does DNS caching [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html] > ZK session retry with cname record > -- > > Key: KAFKA-6839 > URL: https://issues.apache.org/jira/browse/KAFKA-6839 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Tyler Monahan >Priority: Major > > I have a 3 node kafka cluster setup in aws that talks to a 3 node zk cluster > behind an elb. I am giving the kafka instances a dns cname record that points > to the aws elb which is another cname record pointing to two A records. When > the aws elb cname record changes the two A records it is pointing at and > kafka trys to reconnect to zk after losing a session it uses the old A > records and not the new ones so the reconnect attempt fails. There appears to > be some kind of caching instead of using the record that is set in the config > file. > This is the error message I am seeing in the broker logs. > {code:java} > [2018-04-30 20:09:21,449] INFO Opening socket connection to server > ip-10-65-68-244.us-west-2.compute.internal/10.65.68.244:2181. Will not > attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:24,450] WARN Client session timed out, have not heard from > server in 3962ms for sessionid 0x263094512190001 > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:24,451] INFO Client session timed out, have not heard from > server in 3962ms for sessionid 0x263094512190001, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:26,532] INFO Opening socket connection to server > ip-10-65-84-102.us-west-2.compute.internal/10.65.84.102:2181. Will not > attempt to authenticate using SASL (unknown error) > (org.apache.zookeeper.ClientCnxn) > [2018-04-30 20:09:29,531] WARN Session 0x263094512190001 for server null, > unexpected error, closing socket connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn) > java.net.NoRouteToHostException: No route to host > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) > at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)