[jira] [Commented] (KAFKA-6086) Provide for custom error handling when Kafka Streams fails to produce
[ https://issues.apache.org/jira/browse/KAFKA-6086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316143#comment-16316143 ] ASF GitHub Bot commented on KAFKA-6086: --- dguy closed pull request #4395: MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) URL: https://github.com/apache/kafka/pull/4395 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index dbac7fba2e0..256cc18b56f 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -69,6 +69,7 @@ Optional configuration parameters default.deserialization.exception.handler +default.production.exception.handler default.key.serde default.value.serde num.standby.replicas @@ -216,77 +217,82 @@ bootstrap.serversException handling class that implements the DeserializationExceptionHandler interface. 3 milliseconds - key.serde + default.production.exception.handler +Medium +Exception handling class that implements the ProductionExceptionHandler interface. +DefaultProductionExceptionHandler + + key.serde Medium Default serializer/deserializer class for record keys, implements the Serde interface (see also value.serde). Serdes.ByteArray().getClass().getName() - metric.reporters + metric.reporters Low A list of classes to use as metrics reporters. the empty list - metrics.num.samples + metrics.num.samples Low The number of samples maintained to compute metrics. 2 - metrics.recording.level + metrics.recording.level Low The highest recording level for metrics. INFO - metrics.sample.window.ms + metrics.sample.window.ms Low The window of time a metrics sample is computed over. 3 milliseconds - num.standby.replicas + num.standby.replicas Medium The number of standby replicas for each task. 0 - num.stream.threads + num.stream.threads Medium The number of threads to execute stream processing. 1 - partition.grouper + partition.grouper Low Partition grouper class that implements the PartitionGrouper interface. See Partition Grouper - poll.ms + poll.ms Low The amount of time in milliseconds to block waiting for input. 100 milliseconds - replication.factor + replication.factor High The replication factor for changelog topics and repartition topics created by the application. 1 - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 600 milliseconds - state.dir + state.dir High Directory location for state stores. /var/lib/kafka-streams - timestamp.extractor + timestamp.extractor Medium Timestamp extractor class that implements the TimestampExtractor interface. See Timestamp Extractor - value.serde + value.serde Medium Default serializer/deserializer class for record values, implements the Serde interface (see also key.serde). Serdes.ByteArray().getClass().getName() - windowstore.changelog.additional.retention.ms + windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 8640 milliseconds = 1 day @@ -309,6 +315,44 @@ bootstrap.servers + default.production.exception.handler + +The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker + such as attempting to produce a record that is too l
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316179#comment-16316179 ] Seweryn Habdank-Wojewodzki commented on KAFKA-6260: --- Short question. When will those fixes released? :-) Unfortunately at [Relase|https://issues.apache.org/jira/projects/KAFKA?selectedItem=com.atlassian.jira.jira-projects-plugin:release-page] page there are no dates. > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > Fix For: 1.1.0, 1.0.1 > > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafk
[jira] [Resolved] (KAFKA-4908) consumer.properties logging warnings
[ https://issues.apache.org/jira/browse/KAFKA-4908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki resolved KAFKA-4908. --- Resolution: Done Not an issue for me anymore. > consumer.properties logging warnings > > > Key: KAFKA-4908 > URL: https://issues.apache.org/jira/browse/KAFKA-4908 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Minor > > default consumer.properties at startaup of the console consumer delivered > with Kafka package are logging warnings: > [2017-03-15 16:36:57,439] WARN The configuration > 'zookeeper.connection.timeout.ms' was supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2017-03-15 16:36:57,455] WARN The configuration 'zookeeper.connect' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (KAFKA-4908) consumer.properties logging warnings
[ https://issues.apache.org/jira/browse/KAFKA-4908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki closed KAFKA-4908. - > consumer.properties logging warnings > > > Key: KAFKA-4908 > URL: https://issues.apache.org/jira/browse/KAFKA-4908 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.0 >Reporter: Seweryn Habdank-Wojewodzki >Priority: Minor > > default consumer.properties at startaup of the console consumer delivered > with Kafka package are logging warnings: > [2017-03-15 16:36:57,439] WARN The configuration > 'zookeeper.connection.timeout.ms' was supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) > [2017-03-15 16:36:57,455] WARN The configuration 'zookeeper.connect' was > supplied but isn't a known config. > (org.apache.kafka.clients.consumer.ConsumerConfig) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4315) Kafka Connect documentation problems
[ https://issues.apache.org/jira/browse/KAFKA-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki resolved KAFKA-4315. --- Resolution: Done I do not care anymore about this matter. > Kafka Connect documentation problems > > > Key: KAFKA-4315 > URL: https://issues.apache.org/jira/browse/KAFKA-4315 > Project: Kafka > Issue Type: Bug >Reporter: Seweryn Habdank-Wojewodzki > > On the base of documentation of the Kafka Connect - > http://kafka.apache.org/documentation#connect, I had tried to build example > in Java. It was not possible. > The code pieces available on the webpage are taken out of any context and > they are not compiling. > Also it seems they are taken completely from other code software parts, so > even putting them together shows, that they are not building any reasonable > example. And they tend to be very complex. where I would expect that the API > examples are driving "Hello World" like code. > Also there are weak connections between examples from the Kafka documentation > and Kafka Connect tools code parts available in the Kafka source. > Finally I would be nice to have a kind of statement in the Kafka > documentation which parts of API are stable and which are unstable or > experimental. > I saw much (~20) of such a remarks in the Kafka code - I mean that API is > unstable. This note is very important, as we will plan additional effort to > prepare some facades for unstable code. > In my opinion it is nothing wrong in experimental API, but all those matters > when documented shall be well documented. The current status of the main > Kafka documentation makes impression that Kafka Connect is well tested and > consistent and stable feature set, but it is not. What leads to confusion on > the effort management. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (KAFKA-4315) Kafka Connect documentation problems
[ https://issues.apache.org/jira/browse/KAFKA-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seweryn Habdank-Wojewodzki closed KAFKA-4315. - > Kafka Connect documentation problems > > > Key: KAFKA-4315 > URL: https://issues.apache.org/jira/browse/KAFKA-4315 > Project: Kafka > Issue Type: Bug >Reporter: Seweryn Habdank-Wojewodzki > > On the base of documentation of the Kafka Connect - > http://kafka.apache.org/documentation#connect, I had tried to build example > in Java. It was not possible. > The code pieces available on the webpage are taken out of any context and > they are not compiling. > Also it seems they are taken completely from other code software parts, so > even putting them together shows, that they are not building any reasonable > example. And they tend to be very complex. where I would expect that the API > examples are driving "Hello World" like code. > Also there are weak connections between examples from the Kafka documentation > and Kafka Connect tools code parts available in the Kafka source. > Finally I would be nice to have a kind of statement in the Kafka > documentation which parts of API are stable and which are unstable or > experimental. > I saw much (~20) of such a remarks in the Kafka code - I mean that API is > unstable. This note is very important, as we will plan additional effort to > prepare some facades for unstable code. > In my opinion it is nothing wrong in experimental API, but all those matters > when documented shall be well documented. The current status of the main > Kafka documentation makes impression that Kafka Connect is well tested and > consistent and stable feature set, but it is not. What leads to confusion on > the effort management. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3039) Temporary loss of leader resulted in log being completely truncated
[ https://issues.apache.org/jira/browse/KAFKA-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316265#comment-16316265 ] Vincent Da commented on KAFKA-3039: --- we have also experienced this on a kafka server for many __consumer_offsets-X topic partiitons (and also on regular ones ) on a new kafka 0.11.0.2 {noformat} /usr/local/kafka/logs/server.log.2018-01-06-13-[2018-01-06 13:27:35,568] WARN [ReplicaFetcherThread-0-3]: Based on follower's leader epoch, leader replied with an unknown offset in tests-1. High watermark 0 will be used for truncation. (kafka.server.ReplicaFetcherThread) /usr/local/kafka/logs/server.log.2018-01-06-13:[2018-01-06 13:27:35,568] INFO Truncating log __consumer_offsets-19 to offset 0. (kafka.log.Log) {noformat} > Temporary loss of leader resulted in log being completely truncated > --- > > Key: KAFKA-3039 > URL: https://issues.apache.org/jira/browse/KAFKA-3039 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 > Environment: Debian 3.2.54-2 x86_64 GNU/Linux >Reporter: Imran Patel >Priority: Critical > Labels: reliability > > We had an event recently where the temporarily loss of a leader for a > partition (during a manual restart), resulted in the leader coming back with > no high watermark state and truncating its log to zero. Logs (attached below) > indicate that it did have the data but not the commit state. How is this > possible? > Leader (broker 3) > [2015-12-18 21:19:44,666] INFO Completed load of log messages-14 with log end > offset 14175963374 (kafka.log.Log) > [2015-12-18 21:19:45,170] INFO Partition [messages,14] on broker 3: No > checkpointed highwatermark is found for partition [messages,14] > (kafka.cluster.Partition) > [2015-12-18 21:19:45,238] INFO Truncating log messages-14 to offset 0. > (kafka.log.Log) > [2015-12-18 21:20:34,066] INFO Partition [messages,14] on broker 3: Expanding > ISR for partition [messages,14] from 3 to 3,10 (kafka.cluster.Partition) > Replica (broker 10) > [2015-12-18 21:19:19,525] INFO Partition [messages,14] on broker 10: > Shrinking ISR for partition [messages,14] from 3,10,4 to 10,4 > (kafka.cluster.Partition) > [2015-12-18 21:20:34,049] ERROR [ReplicaFetcherThread-0-3], Current offset > 14175984203 for partition [messages,14] out of range; reset offset to 35977 > (kafka.server.ReplicaFetcherThread) > [2015-12-18 21:20:34,033] WARN [ReplicaFetcherThread-0-3], Replica 10 for > partition [messages,14] reset its fetch offset from 14175984203 to current > leader 3's latest offset 35977 (kafka.server.ReplicaFetcherThread) > Some relevant config parameters: > offsets.topic.replication.factor = 3 > offsets.commit.required.acks = -1 > replica.high.watermark.checkpoint.interval.ms = 5000 > unclean.leader.election.enable = false -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6369) General wildcard support for ACL's in kafka
[ https://issues.apache.org/jira/browse/KAFKA-6369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316649#comment-16316649 ] Luiz Picanço commented on KAFKA-6369: - Seens related to KAFKA-5713 > General wildcard support for ACL's in kafka > --- > > Key: KAFKA-6369 > URL: https://issues.apache.org/jira/browse/KAFKA-6369 > Project: Kafka > Issue Type: New Feature >Reporter: Antony Stubbs > > Especially for streams apps where all intermediate topics are prefixed with > the application id. > For example, add read and write access to mystreamsapp.* so any new topics > created by the app don't need to have specific permissions applied to them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6422) When enable trace level log in mirror maker, it will throw null pointer exception and the mirror maker will shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316749#comment-16316749 ] James Cheng commented on KAFKA-6422: I have no ability to merge PRs, but the change looks good to me. > When enable trace level log in mirror maker, it will throw null pointer > exception and the mirror maker will shutdown > > > Key: KAFKA-6422 > URL: https://issues.apache.org/jira/browse/KAFKA-6422 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 0.11.0.1, > 0.11.0.2 >Reporter: Xin Li >Assignee: Xin Li >Priority: Minor > Labels: easyfix > Fix For: 0.11.0.0 > > > https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/tools/MirrorMaker.scala#L414 > when enable trace level log in mirror maker, if the message value is null, it > will throw null pointer exception, and mirror maker will shutdown because of > that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs
[ https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316820#comment-16316820 ] Randall Hauch commented on KAFKA-6387: -- There are a number of cases where this behavior might actually change how the producers and consumers used for connectors are configured. Consider a worker configuration includes the following configuration properties: {code} retries=2 max.partition.fetch.bytes=262144 {code} This configuration file does not define "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}", so currently these would default to 1048576. However, after this proposed change if implemented the "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 262144, not the default 1048576. In short, implementing this change would break backward compatibility. We could implement a configuration switch that controls whether the configurations are inherited, but this adds complexity to the already-complex configuration mechanism. > Worker's producer and consumer configs should inherit from worker configs > - > > Key: KAFKA-6387 > URL: https://issues.apache.org/jira/browse/KAFKA-6387 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > Labels: needs-kip > > Currently, the worker configuration file defines the connection properties > for the three separate types of connections being made to the Kafka cluster: > # the worker group membership, > # producers for source connectors, > # the consumers for sink connectors. > The configs are namespaced because to properly support things like > interceptors where the configs for 2 and 3 would conflict (same config name, > different value). > However, it would be beneficial when such control is not required for the > producers and consumers to inherit the top-level configurations yet be able > to override them with the {{producer.}} and {{consumer.}} namespaced > configurations. This way the producer- and consumer-specific configurations > need only be specified if/when they need to override the top-level > configurations. This may be necessary, for example, to have different ACLs > than the connector tasks compared to the producers and consumers. > This will require a minimal KIP to explain the new behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs
[ https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-6387. -- Resolution: Won't Fix Because of the inability to maintain backward compatibility of the behavior, I'm withdrawing this request and marking this issue as WONTFIX. > Worker's producer and consumer configs should inherit from worker configs > - > > Key: KAFKA-6387 > URL: https://issues.apache.org/jira/browse/KAFKA-6387 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > Labels: needs-kip > > Currently, the worker configuration file defines the connection properties > for the three separate types of connections being made to the Kafka cluster: > # the worker group membership, > # producers for source connectors, > # the consumers for sink connectors. > The configs are namespaced because to properly support things like > interceptors where the configs for 2 and 3 would conflict (same config name, > different value). > However, it would be beneficial when such control is not required for the > producers and consumers to inherit the top-level configurations yet be able > to override them with the {{producer.}} and {{consumer.}} namespaced > configurations. This way the producer- and consumer-specific configurations > need only be specified if/when they need to override the top-level > configurations. This may be necessary, for example, to have different ACLs > than the connector tasks compared to the producers and consumers. > This will require a minimal KIP to explain the new behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6387) Worker's producer and consumer configs should inherit from worker configs
[ https://issues.apache.org/jira/browse/KAFKA-6387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316820#comment-16316820 ] Randall Hauch edited comment on KAFKA-6387 at 1/8/18 7:25 PM: -- Created https://cwiki.apache.org/confluence/display/KAFKA/KIP-246%3A+Connect+producers+and+consumers+should+inherit+worker+configs. However, while writing up the KIP I discovered that there are a number of cases where this behavior might actually change how the producers and consumers used for connectors are configured. Consider a worker configuration includes the following configuration properties: {code} retries=2 max.partition.fetch.bytes=262144 {code} This configuration file does not define "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}", so currently these would default to 1048576. However, after this proposed change if implemented the "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 262144, not the default 1048576. In short, implementing this change would break backward compatibility. We could implement a configuration switch that controls whether the configurations are inherited, but this adds complexity to the already-complex configuration mechanism. was (Author: rhauch): There are a number of cases where this behavior might actually change how the producers and consumers used for connectors are configured. Consider a worker configuration includes the following configuration properties: {code} retries=2 max.partition.fetch.bytes=262144 {code} This configuration file does not define "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}", so currently these would default to 1048576. However, after this proposed change if implemented the "{{consumer.max.partition.fetch.bytes}}" or "{{producer.max.partition.fetch.bytes}}" values would be the inherited value of 262144, not the default 1048576. In short, implementing this change would break backward compatibility. We could implement a configuration switch that controls whether the configurations are inherited, but this adds complexity to the already-complex configuration mechanism. > Worker's producer and consumer configs should inherit from worker configs > - > > Key: KAFKA-6387 > URL: https://issues.apache.org/jira/browse/KAFKA-6387 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > Labels: needs-kip > > Currently, the worker configuration file defines the connection properties > for the three separate types of connections being made to the Kafka cluster: > # the worker group membership, > # producers for source connectors, > # the consumers for sink connectors. > The configs are namespaced because to properly support things like > interceptors where the configs for 2 and 3 would conflict (same config name, > different value). > However, it would be beneficial when such control is not required for the > producers and consumers to inherit the top-level configurations yet be able > to override them with the {{producer.}} and {{consumer.}} namespaced > configurations. This way the producer- and consumer-specific configurations > need only be specified if/when they need to override the top-level > configurations. This may be necessary, for example, to have different ACLs > than the connector tasks compared to the producers and consumers. > This will require a minimal KIP to explain the new behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6412) Improve synchronization in CachingKeyValueStore methods
[ https://issues.apache.org/jira/browse/KAFKA-6412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6412: -- Attachment: 6412-jmh.v1.txt > Improve synchronization in CachingKeyValueStore methods > --- > > Key: KAFKA-6412 > URL: https://issues.apache.org/jira/browse/KAFKA-6412 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu > Attachments: 6412-jmh.v1.txt, k-6412.v1.txt > > > Currently CachingKeyValueStore methods are synchronized at method level. > It seems we can use read lock for getter and write lock for put / delete > methods. > For getInternal(), if the underlying thread is streamThread, the > getInternal() may trigger eviction. This can be handled by obtaining write > lock at the beginning of the method for streamThread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should be accept raw text
[ https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6424: - Component/s: streams > QueryableStateIntegrationTest#queryOnRebalance should be accept raw text > > > Key: KAFKA-6424 > URL: https://issues.apache.org/jira/browse/KAFKA-6424 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Priority: Minor > > I was using QueryableStateIntegrationTest#queryOnRebalance for some > performance test by adding more sentences to inputValues. > I found that when the sentence contains upper case letter, the test would > timeout. > I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} > before the split. > Ideally we can specify the path to text file which contains the text. The > test can read the text file and generate the input array. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6424) QueryableStateIntegrationTest#queryOnRebalance should be accept raw text
[ https://issues.apache.org/jira/browse/KAFKA-6424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6424: - Labels: newbie unit-test (was: ) > QueryableStateIntegrationTest#queryOnRebalance should be accept raw text > > > Key: KAFKA-6424 > URL: https://issues.apache.org/jira/browse/KAFKA-6424 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Ted Yu >Priority: Minor > Labels: newbie, unit-test > > I was using QueryableStateIntegrationTest#queryOnRebalance for some > performance test by adding more sentences to inputValues. > I found that when the sentence contains upper case letter, the test would > timeout. > I get around this limitation by calling {{sentence.toLowerCase(Locale.ROOT)}} > before the split. > Ideally we can specify the path to text file which contains the text. The > test can read the text file and generate the input array. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316912#comment-16316912 ] ASF GitHub Bot commented on KAFKA-6363: --- guozhangwang closed pull request #4371: KAFKA-6363: Use MockAdminClient for any unit tests that depend on Adm… URL: https://github.com/apache/kafka/pull/4371 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java similarity index 91% rename from clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java rename to clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index cca35ac22c9..10281fb6ffa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -35,21 +35,21 @@ * * When finished, be sure to {@link #close() close} the environment object. */ -public class MockKafkaAdminClientEnv implements AutoCloseable { +public class AdminClientUnitTestEnv implements AutoCloseable { private final Time time; private final Cluster cluster; private final MockClient mockClient; private final KafkaAdminClient adminClient; -public MockKafkaAdminClientEnv(Cluster cluster, String...vals) { +public AdminClientUnitTestEnv(Cluster cluster, String...vals) { this(Time.SYSTEM, cluster, vals); } -public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) { +public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) { this(time, cluster, newStrMap(vals)); } -public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map config) { +public AdminClientUnitTestEnv(Time time, Cluster cluster, Map config) { this.time = time; this.cluster = cluster; AdminClientConfig adminClientConfig = new AdminClientConfig(config); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index c0fe73c36ed..84588a9f3be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,10 +38,10 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; @@ -75,8 +75,8 @@ import java.util.concurrent.Future; import static java.util.Arrays.asList; -import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.apache.kafka.common.requests.ResourceType.BROKER; +import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -155,7 +155,7 @@ public void testGenerateClientId() { KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId"))); } -private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) { +private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { HashMap nodes = new HashMap<>(); nodes.put(0, new Node(0, "localhost", 8121)); nodes.put(1, new Node(1, "localhost", 8122)); @@ -163,12 +163,12 @@ private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) { Cluster cluster = new Cluster("mockClusterId", nodes.values(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), nodes.get(0)); -return new MockKafkaAdminClientEnv(cluster, configVals); +return new AdminClientUnitTestEnv(cluster, configVals); } @Test public void testCloseAdminClient() throws Exception { -try (MockKafkaAdminClientEnv env = mockCl
[jira] [Updated] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-3625: --- Description: The KStreamTestDriver and related fixtures defined in streams/src/test/java/org/apache/kafka/test would be useful to developers building applications on top of Kafka Streams, but they are not currently exposed in a package. I propose moving this directory to live under streams/fixtures/src/main and creating a new 'streams:fixtures' project in the gradle configuration to publish these as a separate package. KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams was: The KStreamTestDriver and related fixtures defined in streams/src/test/java/org/apache/kafka/test would be useful to developers building applications on top of Kafka Streams, but they are not currently exposed in a package. I propose moving this directory to live under streams/fixtures/src/main and creating a new 'streams:fixtures' project in the gradle configuration to publish these as a separate package. > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: needs-kip, user-experience > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317097#comment-16317097 ] ASF GitHub Bot commented on KAFKA-3625: --- mjsax opened a new pull request #4402: KAFKA-3625: Add public test utils for Kafka Streams [WIP] URL: https://github.com/apache/kafka/pull/4402 - add new artifact test-utils - add TopologyTestDriver - add MockTime, TestRecord, add TestRecordFactory This PR requires a KIP and is WIP. DO NOT MERGE. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: needs-kip, user-experience > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6428) Fail builds on findbugs warnings
[ https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317170#comment-16317170 ] ASF GitHub Bot commented on KAFKA-6428: --- ewencp closed pull request #4398: KAFKA-6428: Generate findbugs output for CI and fail builds for 'high' level warnings URL: https://github.com/apache/kafka/pull/4398 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/README.md b/README.md index 9371e482169..7db602325c8 100644 --- a/README.md +++ b/README.md @@ -182,13 +182,16 @@ The checkstyle warnings will be found in `reports/checkstyle/reports/main.html` subproject build directories. They are also are printed to the console. The build will fail if Checkstyle fails. Findbugs -Findbugs uses static analysis to look for bugs in the code. +Findbugs uses static analysis to look for bugs in the code. Findbugs is executed as part of the normal build process +and will be included in CI testing. Normally xml reports are generated which are not very human readable unless processed +by Jenkins. If you want to use check Findbugs results during development, run it so it generates HTML output: + You can run findbugs using: -./gradlew findbugsMain findbugsTest -x test +./gradlew findbugsMain findbugsTest -x test -PhtmlFindBugsReport=true The findbugs warnings will be found in `reports/findbugs/main.html` and `reports/findbugs/test.html` files in the subproject build -directories. Use -PxmlFindBugsReport=true to generate an XML report instead of an HTML one. +directories. ### Common build options ### diff --git a/build.gradle b/build.gradle index 725cf0b8bb2..bdbf0fbe6bb 100644 --- a/build.gradle +++ b/build.gradle @@ -371,13 +371,14 @@ subprojects { toolVersion = "3.0.1" excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml") ignoreFailures = false + reportLevel = "high" } test.dependsOn('findbugsMain') tasks.withType(FindBugs) { reports { -xml.enabled(project.hasProperty('xmlFindBugsReport')) -html.enabled(!project.hasProperty('xmlFindBugsReport')) +xml.enabled(!project.hasProperty('htmlFindBugsReport')) +html.enabled(project.hasProperty('htmlFindBugsReport')) } } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fail builds on findbugs warnings > > > Key: KAFKA-6428 > URL: https://issues.apache.org/jira/browse/KAFKA-6428 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > > Findbugs spots likely bugs, and especially for warnings at the High level, it > actually has pretty good signal for real bugs (or just things that might be > risky). We should be failing builds, especially PRs, if any sufficiently high > warnings are listed. We should get this enabled for that level and then > decide if we want to adjust the level of warnings we want to address. > This likely relates to KAFKA-5887 since findbugs may not be sufficiently > maintained for JDK9 support. In any case, the intent is to fail the build > based on whichever tool is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6428) Fail builds on findbugs warnings
[ https://issues.apache.org/jira/browse/KAFKA-6428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-6428. -- Resolution: Invalid Seems it was already setup to report & fail, the reporting on Jenkins was just missing. > Fail builds on findbugs warnings > > > Key: KAFKA-6428 > URL: https://issues.apache.org/jira/browse/KAFKA-6428 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ewen Cheslack-Postava >Assignee: Ewen Cheslack-Postava > > Findbugs spots likely bugs, and especially for warnings at the High level, it > actually has pretty good signal for real bugs (or just things that might be > risky). We should be failing builds, especially PRs, if any sufficiently high > warnings are listed. We should get this enabled for that level and then > decide if we want to adjust the level of warnings we want to address. > This likely relates to KAFKA-5887 since findbugs may not be sufficiently > maintained for JDK9 support. In any case, the intent is to fail the build > based on whichever tool is used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6398: - Summary: Non-aggregation KTable generation operator does not construct value getter correctly (was: Stream-Table join fails, if table is not materialized) > Non-aggregation KTable generation operator does not construct value getter > correctly > > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > Labels: bug > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-05 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-05 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-00 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6398: - Labels: bug (was: ) > Non-aggregation KTable generation operator does not construct value getter > correctly > > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang > Labels: bug > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-05 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-05 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-00 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6398: - Priority: Critical (was: Major) > Non-aggregation KTable generation operator does not construct value getter > correctly > > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Critical > Labels: bug > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-05 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-05 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-00 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6398: - Description: For any operator that generates a KTable, its {{valueGetterSupplier}} has three code path: 1. If the operator is a KTable source operator, using its materialized state store for value getter (note that currently we always materialize on KTable source). 2. If the operator is an aggregation operator, then its generated KTable should always be materialized so we just use its materialized state store. 3. Otherwise, we treat the value getter in a per-operator basis. For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the value getter would just rely on its materialized state store to get the value; otherwise we just rely on the operator itself to define which parent's value getter to inherit and what computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}} where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from parent's value getter and then apply the filter on the fly; and in addition we should let the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}. However, current code does not do this correctly: it 1) does not check if the result KTable is materialized or not, but always try to use its parent's value getter, and 2) it does not try to connect its parent's materialized store to the future operator. As a result, these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in TopologyException when building. The following is an example: Using a non-materialized KTable in a stream-table join fails: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...); builder.stream("stream-topic").join(filteredKTable,...); {noformat} fails with {noformat} org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) {noformat} Adding a store name is not sufficient as workaround but fails differently: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); builder.stream("stream-topic").join(filteredKTable,...); {noformat} error: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-05 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-05 has no access to StateStore KTABLE-SOURCE-STATE-STORE-00 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) {noformat} One can workaround by piping the result through a topic: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; builder.stream("stream-topic").join(filteredKTable,...); {noformat} Note that
[jira] [Updated] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6398: - Description: For any operator that generates a KTable, its {{valueGetterSupplier}} has three code path: 1. If the operator is a KTable source operator, using its materialized state store for value getter (note that currently we always materialize on KTable source). 2. If the operator is an aggregation operator, then its generated KTable should always be materialized so we just use its materialized state store. 3. Otherwise, we treat the value getter in a per-operator basis. For 3) above, what we SHOULD do is that, if the generated KTable is materialized, the value getter would just rely on its materialized state store to get the value; otherwise we just rely on the operator itself to define which parent's value getter to inherit and what computational logic to apply on-the-fly to get the value. For example, for {{KTable#filter()}} where the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just get from parent's value getter and then apply the filter on the fly; and in addition we should let the future operators to be able to access its parent's materialized state store via {{connectProcessorAndStateStore}}. However, current code does not do this correctly: it 1) does not check if the result KTable is materialized or not, but always try to use its parent's value getter, and 2) it does not try to connect its parent's materialized store to the future operator. As a result, these operators such as {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would result in TopologyException when building. The following is an example: Using a non-materialized KTable in a stream-table join fails: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...); builder.stream("stream-topic").join(filteredKTable,...); {noformat} fails with {noformat} org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore null is not added yet. at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) at org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) at org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) at org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) {noformat} Adding a store name is not sufficient as workaround but fails differently: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(..., "STORE-NAME"); builder.stream("stream-topic").join(filteredKTable,...); {noformat} error: {noformat} org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-JOIN-05 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) at org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Processor KSTREAM-JOIN-05 has no access to StateStore KTABLE-SOURCE-STATE-STORE-00 at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) at org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) at org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) at org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) {noformat} One can workaround by piping the result through a topic: {noformat} final KTable filteredKTable = builder.table("table-topic").filter(...).through("TOPIC");; builder.stream("stream-topic").join(filteredKTable,...); {noformat} Note that
[jira] [Commented] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly
[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317240#comment-16317240 ] Guozhang Wang commented on KAFKA-6398: -- [~bja...@isi.nc] I can reproduce the issue you mentioned, and I have a theory on its root cause, which is reflected in the updated description of this JIRA. My plan is to merge https://github.com/apache/kafka/pull/4384/files first, which fixes a minor issue but not this ticket as a whole, and then will continue to work on this ticket. > Non-aggregation KTable generation operator does not construct value getter > correctly > > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Critical > Labels: bug > > For any operator that generates a KTable, its {{valueGetterSupplier}} has > three code path: > 1. If the operator is a KTable source operator, using its materialized state > store for value getter (note that currently we always materialize on KTable > source). > 2. If the operator is an aggregation operator, then its generated KTable > should always be materialized so we just use its materialized state store. > 3. Otherwise, we treat the value getter in a per-operator basis. > For 3) above, what we SHOULD do is that, if the generated KTable is > materialized, the value getter would just rely on its materialized state > store to get the value; otherwise we just rely on the operator itself to > define which parent's value getter to inherit and what computational logic to > apply on-the-fly to get the value. For example, for {{KTable#filter()}} where > the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just > get from parent's value getter and then apply the filter on the fly; and in > addition we should let the future operators to be able to access its parent's > materialized state store via {{connectProcessorAndStateStore}}. > However, current code does not do this correctly: it 1) does not check if the > result KTable is materialized or not, but always try to use its parent's > value getter, and 2) it does not try to connect its parent's materialized > store to the future operator. As a result, these operators such as > {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would > result in TopologyException when building. The following is an example: > > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-05 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-05 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-00 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.inte
[jira] [Commented] (KAFKA-6422) When enable trace level log in mirror maker, it will throw null pointer exception and the mirror maker will shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317292#comment-16317292 ] ASF GitHub Bot commented on KAFKA-6422: --- hachikuji closed pull request #4387: KAFKA-6422 Mirror maker will throw null pointer exception when the message value is null URL: https://github.com/apache/kafka/pull/4387 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 618fd2a95b9..907fe20f414 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -67,7 +67,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private[tools] var producer: MirrorMakerProducer = null private var mirrorMakerThreads: Seq[MirrorMakerThread] = null - private val isShuttingdown: AtomicBoolean = new AtomicBoolean(false) + private val isShuttingDown: AtomicBoolean = new AtomicBoolean(false) // Track the messages not successfully sent by mirror maker. private val numDroppedMessages: AtomicInteger = new AtomicInteger(0) private var messageHandler: MirrorMakerMessageHandler = null @@ -384,7 +384,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } def cleanShutdown() { -if (isShuttingdown.compareAndSet(false, true)) { +if (isShuttingDown.compareAndSet(false, true)) { info("Start clean shutdown.") // Shutdown consumer threads. info("Shutting down consumer threads.") @@ -426,7 +426,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { try { while (!exitingOnSendFailure && !shuttingDown && mirrorMakerConsumer.hasData) { val data = mirrorMakerConsumer.receive() - trace("Sending message with value size %d and offset %d".format(data.value.length, data.offset)) + if (data.value != null) { +trace("Sending message with value size %d and offset %d.".format(data.value.length, data.offset)) + } else { +trace("Sending message with null value and offset %d.".format(data.offset)) + } val records = messageHandler.handle(data) records.asScala.foreach(producer.send) maybeFlushAndCommitOffsets() @@ -459,7 +463,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { shutdownLatch.countDown() info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker -if (!isShuttingdown.get()) { +if (!isShuttingDown.get()) { fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") sys.exit(-1) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > When enable trace level log in mirror maker, it will throw null pointer > exception and the mirror maker will shutdown > > > Key: KAFKA-6422 > URL: https://issues.apache.org/jira/browse/KAFKA-6422 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 0.11.0.1, > 0.11.0.2 >Reporter: Xin Li >Assignee: Xin Li >Priority: Minor > Labels: easyfix > Fix For: 0.11.0.0 > > > https://github.com/apache/kafka/blob/0.10.0/core/src/main/scala/kafka/tools/MirrorMaker.scala#L414 > when enable trace level log in mirror maker, if the message value is null, it > will throw null pointer exception, and mirror maker will shutdown because of > that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joerg Heinicke updated KAFKA-6366: -- Attachment: ConverterProcessor_DEBUG.zip > StackOverflowError in kafka-coordinator-heartbeat-thread > > > Key: KAFKA-6366 > URL: https://issues.apache.org/jira/browse/KAFKA-6366 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Joerg Heinicke >Assignee: Jason Gustafson > Attachments: 6366.v1.txt, ConverterProcessor.zip, > ConverterProcessor_DEBUG.zip, Screenshot-2017-12-19 21.35-22.10 processing.png > > > With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing > once a StackOverflowError in the heartbeat thread occurred due to > connectivity issues of the consumers to the coordinating broker: > Immediately before the exception there are hundreds, if not thousands of log > entries of following type: > 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | > my-consumer-group] INFO - [Consumer clientId=consumer-4, > groupId=my-consumer-group] Marking the coordinator : (id: > 2147483645 rack: null) dead > The exceptions always happen somewhere in the DateFormat code, even > though at different lines. > 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | > my-consumer-group] ERROR - Uncaught exception in thread > 'kafka-coordinator-heartbeat-thread | my-consumer-group': > java.lang.StackOverflowError > at > java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) > at > java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) > at java.util.Calendar.getDisplayName(Calendar.java:2110) > at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) > at java.text.DateFormat.format(DateFormat.java:345) > at > org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) > at > org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at > org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > ... > the following 9 lines are repeated around hundred times. > ... > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(R
[jira] [Created] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's
Randall Hauch created KAFKA-6433: Summary: Connect distributed workers should fail if their config is "incompatible" with leader's Key: KAFKA-6433 URL: https://issues.apache.org/jira/browse/KAFKA-6433 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Randall Hauch Currently, each distributed worker config must have the same `worker.id` and must use the same internal topics for configs, offsets, and status. Additionally, each worker must be configured to have the same connectors, SMTs, and converters; confusing error messages will result when some workers are able to deploy connector tasks with SMTs while others fail when they are missing plugins the other workers do have. Ideally, a Connect workers would only be allowed to join the cluster if it were "compatible" with the the existing cluster, where "compatible" perhaps includes using the same internal topics and having the same set of plugins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's
[ https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317325#comment-16317325 ] Randall Hauch commented on KAFKA-6433: -- Incidentally, it would not help to change what Connect stores in the status topic, because if the workers are using different status topics they would read different status information. A better option might be to ship this additional information in the metadata used in Connect's rebalance subprotocol. We can't do this today, but we're talking about evolving the protocol for incremental rebalance, and it'd be great to also add some additional worker metadata during that evolution as well as tolerate optional metadata, enabling adding more metadata fields that may not be necessary across the cluster. > Connect distributed workers should fail if their config is "incompatible" > with leader's > --- > > Key: KAFKA-6433 > URL: https://issues.apache.org/jira/browse/KAFKA-6433 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > > Currently, each distributed worker config must have the same `worker.id` and > must use the same internal topics for configs, offsets, and status. > Additionally, each worker must be configured to have the same connectors, > SMTs, and converters; confusing error messages will result when some workers > are able to deploy connector tasks with SMTs while others fail when they are > missing plugins the other workers do have. > Ideally, a Connect workers would only be allowed to join the cluster if it > were "compatible" with the the existing cluster, where "compatible" perhaps > includes using the same internal topics and having the same set of plugins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's
[ https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6433: - Labels: needs-kip (was: ) > Connect distributed workers should fail if their config is "incompatible" > with leader's > --- > > Key: KAFKA-6433 > URL: https://issues.apache.org/jira/browse/KAFKA-6433 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > Labels: needs-kip > > Currently, each distributed worker config must have the same `worker.id` and > must use the same internal topics for configs, offsets, and status. > Additionally, each worker must be configured to have the same connectors, > SMTs, and converters; confusing error messages will result when some workers > are able to deploy connector tasks with SMTs while others fail when they are > missing plugins the other workers do have. > Ideally, a Connect workers would only be allowed to join the cluster if it > were "compatible" with the the existing cluster, where "compatible" perhaps > includes using the same internal topics and having the same set of plugins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6433) Connect distributed workers should fail if their config is "incompatible" with leader's
[ https://issues.apache.org/jira/browse/KAFKA-6433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317329#comment-16317329 ] Randall Hauch commented on KAFKA-6433: -- KAFKA-5505 will likely be implemented as a change/evolution of Connect's rebalance subprotocol, and the requirements to solve this issue should be considered as part of that effort. > Connect distributed workers should fail if their config is "incompatible" > with leader's > --- > > Key: KAFKA-6433 > URL: https://issues.apache.org/jira/browse/KAFKA-6433 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch > Labels: needs-kip > > Currently, each distributed worker config must have the same `worker.id` and > must use the same internal topics for configs, offsets, and status. > Additionally, each worker must be configured to have the same connectors, > SMTs, and converters; confusing error messages will result when some workers > are able to deploy connector tasks with SMTs while others fail when they are > missing plugins the other workers do have. > Ideally, a Connect workers would only be allowed to join the cluster if it > were "compatible" with the the existing cluster, where "compatible" perhaps > includes using the same internal topics and having the same set of plugins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5505) Connect: Do not restart connector and existing tasks on task-set change
[ https://issues.apache.org/jira/browse/KAFKA-5505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317333#comment-16317333 ] Randall Hauch commented on KAFKA-5505: -- We'll likely change/evolve Connect's rebalance subprotocol to enable implementing this improvement. See also KAFKA-6433 for another improvement that will require changes to the metadata used in the subprotocol. > Connect: Do not restart connector and existing tasks on task-set change > --- > > Key: KAFKA-5505 > URL: https://issues.apache.org/jira/browse/KAFKA-5505 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.1 >Reporter: Per Steffensen > > I am writing a connector with a frequently changing task-set. It is really > not working very well, because the connector and all existing tasks are > restarted when the set of tasks changes. E.g. if the connector is running > with 10 tasks, and an additional task is needed, the connector itself and all > 10 existing tasks are restarted, just to make the 11th task run also. My > tasks have a fairly heavy initialization, making it extra annoying. I would > like to see a change, introducing a "mode", where only new/deleted tasks are > started/stopped when notifying the system that the set of tasks changed > (calling context.requestTaskReconfiguration() - or something similar). > Discussed this issue a little on d...@kafka.apache.org in the thread "Kafka > Connect: To much restarting with a SourceConnector with dynamic set of tasks" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317346#comment-16317346 ] Joerg Heinicke commented on KAFKA-6366: --- Sorry for the delay. Even though I planned to at least try to extract the information from the system over Christmas holidays I haven't managed. Now back from vacation I have done so. I attached the log file [^ConverterProcessor_DEBUG.zip] (which is around 700k lines and 150 MB for about 5 mins!). I don't get additional hints on the issue, not sure whether it helps you to confirm the scenario. We don't have particular test scenarios to test the patch which means we would have to run this directly in production - which I'm not too comfortable with if you could not even confirm the scenario yet. Another question which comes to my mind is how the consumer will behave in case we hit the scenario with the patch applied since apparently all other threads are still able to commit while the failing thread (pool-5-thread-5 in the attached log file) marked the coordinator dead, i.e. what is the expected and probably originally intended behavior. And on the most basic and practical side: How do I get a Kafka distribution with the patch applied? Apparently I will have to build it myself. Can you give me some kick-off hints? Is the documentation at https://github.com/apache/kafka all I need? > StackOverflowError in kafka-coordinator-heartbeat-thread > > > Key: KAFKA-6366 > URL: https://issues.apache.org/jira/browse/KAFKA-6366 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Joerg Heinicke >Assignee: Jason Gustafson > Attachments: 6366.v1.txt, ConverterProcessor.zip, > ConverterProcessor_DEBUG.zip, Screenshot-2017-12-19 21.35-22.10 processing.png > > > With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing > once a StackOverflowError in the heartbeat thread occurred due to > connectivity issues of the consumers to the coordinating broker: > Immediately before the exception there are hundreds, if not thousands of log > entries of following type: > 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | > my-consumer-group] INFO - [Consumer clientId=consumer-4, > groupId=my-consumer-group] Marking the coordinator : (id: > 2147483645 rack: null) dead > The exceptions always happen somewhere in the DateFormat code, even > though at different lines. > 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | > my-consumer-group] ERROR - Uncaught exception in thread > 'kafka-coordinator-heartbeat-thread | my-consumer-group': > java.lang.StackOverflowError > at > java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) > at > java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) > at java.util.Calendar.getDisplayName(Calendar.java:2110) > at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) > at java.text.DateFormat.format(DateFormat.java:345) > at > org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) > at > org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at > org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clie
[jira] [Commented] (KAFKA-6096) Add concurrent tests to exercise all paths in group/transaction managers
[ https://issues.apache.org/jira/browse/KAFKA-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317394#comment-16317394 ] ASF GitHub Bot commented on KAFKA-6096: --- hachikuji closed pull request #4122: KAFKA-6096: Add multi-threaded tests for group coordinator, txn manager URL: https://github.com/apache/kafka/pull/4122 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala new file mode 100644 index 000..0ecc3f538b1 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.{ Collections, Random } +import java.util.concurrent.{ ConcurrentHashMap, Executors } +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.locks.Lock + +import kafka.coordinator.AbstractCoordinatorConcurrencyTest._ +import kafka.log.Log +import kafka.server._ +import kafka.utils._ +import kafka.utils.timer.MockTimer +import kafka.zk.KafkaZkClient +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch, RecordsProcessingStats } +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.easymock.EasyMock +import org.junit.{ After, Before } + +import scala.collection._ +import scala.collection.JavaConverters._ + +abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] { + + val nThreads = 5 + + val time = new MockTime + val timer = new MockTimer + val executor = Executors.newFixedThreadPool(nThreads) + val scheduler = new MockScheduler(time) + var replicaManager: TestReplicaManager = _ + var zkClient: KafkaZkClient = _ + val serverProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + val random = new Random + + @Before + def setUp() { + +replicaManager = EasyMock.partialMockBuilder(classOf[TestReplicaManager]).createMock() +replicaManager.createDelayedProducePurgatory(timer) + +zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + } + + @After + def tearDown() { +EasyMock.reset(replicaManager) +if (executor != null) + executor.shutdownNow() + } + + /** +* Verify that concurrent operations run in the normal sequence produce the expected results. +*/ + def verifyConcurrentOperations(createMembers: String => Set[M], operations: Seq[Operation]) { +OrderedOperationSequence(createMembers("verifyConcurrentOperations"), operations).run() + } + + /** +* Verify that arbitrary operations run in some random sequence don't leave the coordinator +* in a bad state. Operations in the normal sequence should continue to work as expected. +*/ + def verifyConcurrentRandomSequences(createMembers: String => Set[M], operations: Seq[Operation]) { +EasyMock.reset(replicaManager) +for (i <- 0 to 10) { + // Run some random operations + RandomOperationSequence(createMembers(s"random$i"), operations).run() + + // Check that proper sequences still work correctly + OrderedOperationSequence(createMembers(s"ordered$i"), operations).run() +} + } + + def verifyConcurrentActions(actions: Set[Action]) { +val futures = actions.map(executor.submit) +futures.map(_.get) +enableCompletion() +actions.foreach(_.await()) + } + + def enableCompletion(): Unit = { +replicaManager.tryCompleteDelayedRequests() +scheduler.tick() + } + + abstract class OperationSequence(members: Set[M], operations: Seq[Operation]) { +def actionSequence: Seq[Set[Action]] +def run(): Unit = { + actionSequence.foreach(verifyConcurrentActions) +} + } +
[jira] [Commented] (KAFKA-6384) TransactionsTest#testFencingOnSendOffsets sometimes fails with ProducerFencedException
[ https://issues.apache.org/jira/browse/KAFKA-6384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317566#comment-16317566 ] Ted Yu commented on KAFKA-6384: --- Haven't seen this failure for a while. > TransactionsTest#testFencingOnSendOffsets sometimes fails with > ProducerFencedException > -- > > Key: KAFKA-6384 > URL: https://issues.apache.org/jira/browse/KAFKA-6384 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu > > From > https://builds.apache.org/job/kafka-trunk-jdk8/2283/testReport/junit/kafka.api/TransactionsTest/testFencingOnSendOffsets/ > : > {code} > org.scalatest.junit.JUnitTestFailedError: Got an unexpected exception from a > fenced producer. > at > org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:100) > at > org.scalatest.junit.JUnitSuite.newAssertionFailedException(JUnitSuite.scala:71) > at org.scalatest.Assertions$class.fail(Assertions.scala:1105) > at org.scalatest.junit.JUnitSuite.fail(JUnitSuite.scala:71) > at > kafka.api.TransactionsTest.testFencingOnSendOffsets(TransactionsTest.scala:357) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy1.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnecti
[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5863: -- Description: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} was: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6403) Why kafka sync send message with 10 seconds delay
[ https://issues.apache.org/jira/browse/KAFKA-6403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-6403: - Fix Version/s: (was: 1.0.0) [~change] Since you've only provided some configs, it'll probably be difficult to track down the issue. You'd probably need to provide at least more information about how you are measuring the timing, and even so the issue could be anywhere from the client to any of the brokers involved. Probably most useful here would be logs -- if this is reproducible, turn the logging on the client up to DEBUG level. I've removed the fix version as it does not make sense for a bug that affects 1.0.0 to also be fixable in 1.0.0. We can update the fix version if the issue is tracked down and we determine whether including it in a bugfix version or just the next major/minor release makes sense. > Why kafka sync send message with 10 seconds delay > -- > > Key: KAFKA-6403 > URL: https://issues.apache.org/jira/browse/KAFKA-6403 > Project: Kafka > Issue Type: Test > Components: producer >Affects Versions: 1.0.0 >Reporter: change >Priority: Blocker > > I have a timertask to send a message to kafka every half an hour, Statistics > reports > ||send starttime|send successfully time|delay/ms|| > |2017-12-26 15:50:25.413 |2017-12-26 15:50:35,447|10034| > |2017-12-26 16:20:35.419 |2017-12-26 16:20:45,483|10064| > |2017-12-26 17:28:20.708|2017-12-26 17:28:25,743|5035 | > |2017-12-26 18:44:20.447|2017-12-26 18:44:25,516|5069| > |2017-12-26 19:14:25.518|2017-12-26 19:14:30,547|5029| > ProducerConfig values: > acks = 1 > batch.size = 16384 > bootstrap.servers = [192.168.0.179:39092] > buffer.memory = 33554432 > client.id = > compression.type = none > connections.max.idle.ms = 54 > enable.idempotence = false > interceptor.classes = null > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > linger.ms = 0 > max.block.ms = 6 > max.in.flight.requests.per.connection = 5 > max.request.size = 1048576 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 3 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > transaction.timeout.ms = 6 > transactional.id = null > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > [2017-12-26 03:30:58,042] INFO KafkaConfig values: > advertised.host.name = kafka-1.default.svc.cluster.local > advertised.listeners = null > advertised.port = null > alter.config.policy.class.name = null > authorizer.class.name = > auto.create.topics.enable = true > auto.leader.rebalance.enable = true > background.threads = 10 > broker.id = 1 > broker.id.generation.enable = true > broker.rack = null > compression.type = producer > connections.max.idle.ms = 60 > controlled.shutdown.enable = true > controlled.shutdown.max.retries = 3 > controlled.shutdown.retry.backoff.ms = 5000 > controller.socket.timeout.ms = 3 > create.topic.policy.class.name = null > default.replication.factor = 3 > delete.records.purgatory.purge.interval.requests = 1 > delete.topic.enable = true > fetch.purgatory.purge.interval.requests = 1000 > group.initial.rebalance.delay.ms = 3000 >
[jira] [Commented] (KAFKA-4686) Null Message payload is shutting down broker
[ https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317654#comment-16317654 ] Ewen Cheslack-Postava commented on KAFKA-4686: -- [~ijuma] This has been marked critical, but has getting bumped from version to version since May. Is this really something that should be slated for 1.0.1 if we keep bumping it down the road? (Admittedly it's "critical" and not "blocker", but still...) > Null Message payload is shutting down broker > > > Key: KAFKA-4686 > URL: https://issues.apache.org/jira/browse/KAFKA-4686 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 > Environment: Amazon Linux AMI release 2016.03 kernel > 4.4.19-29.55.amzn1.x86_64 >Reporter: Rodrigo Queiroz Saramago >Priority: Critical > Fix For: 1.0.1 > > Attachments: KAFKA-4686-NullMessagePayloadError.tar.xz, > kafkaServer.out > > > Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in > which clients connect using two-way ssl authentication. I use kafka version > 0.10.1.1, the system works as expected for a while, but if the broker goes > down and then is restarted, something got corrupted and is not possible start > broker again, it always fails with the same error. What this error mean? What > can I do in this case? Is this the expected behavior? > [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads > during logs loading: kafka.common.KafkaException: Message payload is null: > Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = > null) (kafka.log.LogManager) > [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. > Prepare to shutdown (kafka.server.KafkaServer) > kafka.common.KafkaException: Message payload is null: Message(magic = 0, > attributes = 1, crc = 4122289508, key = null, payload = null) > at > kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90) > at > kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85) > at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33) > at kafka.log.LogSegment.recover(LogSegment.scala:223) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at kafka.log.Log.loadSegments(Log.scala:179) > at kafka.log.Log.(Log.scala:108) > at > kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer) > [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. > (org.I0Itec.zkclient.ZkEventThread) > [2017-01-23 07:03:28,954] INFO EventThread shut down for session: > 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn) > [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed > (org.apache.zookeeper.ZooKeeper) > [2017-01-23 07:03:28,957] INFO shut down completed (kafka.server.KafkaServer) > [2017-01-23 07:03:28,959] FATAL Fatal error during KafkaServerStartable > startup. Prepare to shutdown (kafka.server.KafkaServerStartable) > kafka.common.KafkaException: Message payload is null: Message(magic = 0, > attributes = 1, crc = 4122289508, key = null, payload = null) > at > kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90) > at > kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85) > at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33) > at kafka.log.LogSegment.recover(LogSegment.scala:223) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218) > at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >
[jira] [Commented] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain
[ https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317752#comment-16317752 ] Ewen Cheslack-Postava commented on KAFKA-6396: -- If I'm understanding your request properly, there are a couple of problems with what you're proposing. First, for transformations it would be a fundamental change to how they work. Today they work as SMTs: single message transforms, which means you get the entire message. For sink connectors we a) convert the entire message, b) transform the entire message, c) process the entire message in the sink connector. To pass the data to stage (b), it *must* have fully been processed, key and value, by (a). Second, what you want to do by checking the value for `null` doesn't work because `null` is valid for values. Transformations only remove the record if *the entire record* is returned as `null`. A `null` value will be written and potentially used for compaction if it is contained in a surrounding record. It would certainly be possible to write a system that worked the way you describe, but it requires a much more complicated processing pipeline. You need to define the order in which each component of the message is deserialized, define different transformations for each (as well as potentially a transformation for the entire record if you want to support functionality like Kafka Connect currently supports like copying data between key/value). This gets even more complicated when you consider all the components, not all of which are in Kafka Connect yet: key, value, headers, timestamp, etc. To me, this smells like trying to fit a pretty highly optimized transformation pipeline into Connect simply because most of the building blocks are there to do so without coding. (I would consider any case where you're trying to avoid deserializing *parts* of a record, a pretty optimized used case.) Personally, I'd recommend writing a small Kafka Streams app to handle this case, where you can carefully select how to deserialize and process the data, and interleave the processing of most components of the record carefully to optimize performance. > Possibly kafka-connect converter should be able to stop processing chain > > > Key: KAFKA-6396 > URL: https://issues.apache.org/jira/browse/KAFKA-6396 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > At present only transformations can discard records returning null. But I > think sometimes it would be nice to discard processing chain after converting > message. For example I have some tags shipped with a message key and I want > to stop processing the message after converting its key (there are a lot of > messages and I don't want to deserialize message values that I don't need). > At the moment to do that I should disable converters and move message > deserializing to the transformation chain: > {code} > key.converter=org.apache.kafka.connect.converters.ByteArrayConverter > value.converter=org.apache.kafka.connect.converters.ByteArrayConverter > transforms=proto,catalog > transforms.proto.type=company.evo.kafka.ProtobufTransformation > transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage > transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage > transforms.proto.tag=catalog > {code} > If > [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453] > checked converted values on {{null}} it would solved my problem more > gracefully -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4674) Frequent ISR shrinking and expanding and disconnects among brokers
[ https://issues.apache.org/jira/browse/KAFKA-4674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317809#comment-16317809 ] Andrey Falko commented on KAFKA-4674: - I just reproduced this on a 5 node test cluster by gradually creating 35k 3x replicated topics and holding a consumer for each topic. I'm running latest 1.0.0. How many partitions do you have and what is their replication factor in your setups? > Frequent ISR shrinking and expanding and disconnects among brokers > -- > > Key: KAFKA-4674 > URL: https://issues.apache.org/jira/browse/KAFKA-4674 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.10.0.1 > Environment: OS: Redhat Linux 2.6.32-431.el6.x86_64 > JDK: 1.8.0_45 >Reporter: Kaiming Wan > Attachments: controller.log.rar, kafkabroker.20170221.log.zip, > server.log.2017-01-11-14, zookeeper.out.2017-01-11.log > > > We use a kafka cluster with 3 brokers in production environment. It works > well for several month. Recently, we get the UnderReplicatedPartitions>0 > warning mail. When we check the log, we find that the partition is always > experience ISR shrinking and expanding. And the disconnection exception can > be found in controller's log. > We also found some deviant output in zookeeper's log which point to a > consumer(using old API depends on zookeeper ) which has stopped its work with > many lags. > Actually, it is not the first time we encounter this problem. When we > first met this problem, we also found the same phenomenon and the log output. > We solve the problem by deleting the consumer node info in zookeeper. Then > everything goes well. > However, this time, after we deleting the consumer which already have > large lag, the frequent ISR shrinking and expanding didn't stop for a very > long time(serveral hours). Though, the issue didn't affect our consumer and > producer, we think it will make our cluster unstable. So at last, we solve > this problem by restart the controller broker. > And now I wander what cause this problem. I check the source code and > only know poll timeout will cause disconnection and ISR shrinking. Is the > issue related to zookeeper because it will not hold too many metadata > modification and make the replication fetch thread take more time? > I upload the log file in the attachment. -- This message was sent by Atlassian JIRA (v6.4.14#64029)