[jira] [Comment Edited] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level
[ https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337075#comment-16337075 ] Coen Damen edited comment on KAFKA-6471 at 1/24/18 7:40 AM: Hi Jason, thanks for your reply. The Use Case is the following. We are retrieving log files from a machine. These log records are transformed into Kafka messages. Writing a log file into Kafka is atomic. In case of a read failure (of a file) or a write failure (to Kafka), the transaction of writing messages to Kafka should be aborted an tried again. When tried again, or when idle for a longer time, during a restart or commencing of the "job" we want to read where the processing was halted. e.g. the last successfully processed file. For this I expected to use the seekToEnd with a Consumer that has the setting read_committed. But, it moved to the end of the Topic, even after the Topic contained many aborted messages at the end. Note: the filename and the index within the file are part of the message, So we want to retrieve the last successful message and extract the filename from it. Thank you, Coen was (Author: coenos): Hi Jason, thanks for your reply. The Use Case is the following. We are retrieving log files from a machine. These log records are transformed into Kafka messages. Writing a log file into Kafka is atomic. In case of a read failure (of a file) or a write failure (to Kafka), the transaction of writing messages to Kafka should be aborted an tried again. When tried again, or when idle for a longer time, during a restart or commencing of the "job" we want to read where the processing was halted. e.g. the last successfully processed file. For this I expected to use the seekToEnd with a Consumer that has the setting read_committed. But, it moved to the end of the Topic, even after the Topic contained many aborted messages at the end. Thank you, Coen > seekToEnd and seek give unclear results for Consumer with read_committed > isolation level > > > Key: KAFKA-6471 > URL: https://issues.apache.org/jira/browse/KAFKA-6471 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Coen Damen >Priority: Major > > I am using the transactional KafkaProducer to send messages to a topic. This > works fine. I use a KafkaConsumer with read_committed isolation level and I > have an issue with the seek and seekToEnd methods. According to the > documentation, the seek and seekToEnd methods give me the LSO (Last Stable > Offset). But this is a bit confusing. As it gives me always the same value, > the END of the topic. No matter if the last entry is committed (by the > Producer) or part of an aborted transaction. Example, after I abort the last > 5 tries to insert 20_000 messages, the last 100_000 records should not be > read by the Consumer. But during a seekToEnd it moves to the end of the Topic > (including the 100_000 messages). But the poll() does not return them. > I am looking for a way to retrieve the Last Committed Offset (so the last > successful committed message by the Producer). There seems to be no proper > API method for this. So do I need to roll my own? > Option would be to move back and poll until no more records are retrieved, > this would result in the last committed message. But I would assume that > Kafka provides this method. > We use Kafka 1.0.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level
[ https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337075#comment-16337075 ] Coen Damen commented on KAFKA-6471: --- Hi Jason, thanks for your reply. The Use Case is the following. We are retrieving log files from a machine. These log records are transformed into Kafka messages. Writing a log file into Kafka is atomic. In case of a read failure (of a file) or a write failure (to Kafka), the transaction of writing messages to Kafka should be aborted an tried again. When tried again, or when idle for a longer time, during a restart or commencing of the "job" we want to read where the processing was halted. e.g. the last successfully processed file. For this I expected to use the seekToEnd with a Consumer that has the setting read_committed. But, it moved to the end of the Topic, even after the Topic contained many aborted messages at the end. Thank you, Coen > seekToEnd and seek give unclear results for Consumer with read_committed > isolation level > > > Key: KAFKA-6471 > URL: https://issues.apache.org/jira/browse/KAFKA-6471 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Coen Damen >Priority: Major > > I am using the transactional KafkaProducer to send messages to a topic. This > works fine. I use a KafkaConsumer with read_committed isolation level and I > have an issue with the seek and seekToEnd methods. According to the > documentation, the seek and seekToEnd methods give me the LSO (Last Stable > Offset). But this is a bit confusing. As it gives me always the same value, > the END of the topic. No matter if the last entry is committed (by the > Producer) or part of an aborted transaction. Example, after I abort the last > 5 tries to insert 20_000 messages, the last 100_000 records should not be > read by the Consumer. But during a seekToEnd it moves to the end of the Topic > (including the 100_000 messages). But the poll() does not return them. > I am looking for a way to retrieve the Last Committed Offset (so the last > successful committed message by the Producer). There seems to be no proper > API method for this. So do I need to roll my own? > Option would be to move back and poll until no more records are retrieved, > this would result in the last committed message. But I would assume that > Kafka provides this method. > We use Kafka 1.0.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5142) KIP-145 - Expose Record Headers in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337069#comment-16337069 ] Randall Hauch commented on KAFKA-5142: -- KIP-145 has passed. > KIP-145 - Expose Record Headers in Kafka Connect > > > Key: KAFKA-5142 > URL: https://issues.apache.org/jira/browse/KAFKA-5142 > Project: Kafka > Issue Type: New Feature > Components: clients >Reporter: Michael Andre Pearce >Assignee: Michael Andre Pearce >Priority: Major > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect > As KIP-82 introduced Headers into the core Kafka Product, it would be > advantageous to expose them in the Kafka Connect Framework. > Connectors that replicate data between Kafka cluster or between other > messaging products and Kafka would want to replicate the headers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest
[ https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336969#comment-16336969 ] ASF GitHub Bot commented on KAFKA-6468: --- ambroff opened a new pull request #4468: KAFKA-6468 Read replication-offset-checkpoint once URL: https://github.com/apache/kafka/pull/4468 Only read the high watermark checkpoint file (replication-offset-checkpoint) once. Before this patch, this file is read every time the broker handles LeaderAndIsrRequest. See kafka.cluster.Partition#getOrCreateReplica(Int, Boolean). On my local test cluster of three brokers with around 40k partitions, the initial LeaderAndIsrRequest refers to every partition in the cluster, and it can take 20 to 30 minutes to create all of the replicas because the replication-offset-checkpoint is nearly 2MB. Changing this code so that we only read this file once on startup reduces the time to create all replicas to around one minute. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replication high watermark checkpoint file read for every LeaderAndIsrRequest > - > > Key: KAFKA-6468 > URL: https://issues.apache.org/jira/browse/KAFKA-6468 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > The high watermark for each partition in a given log directory is written to > disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This > checkpoint file is used to create replicas when joining the cluster. > [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180] > Unfortunately this file is read every time > kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this > isn't a big deal, but for a small cluster with lots of partitions all of the > reads of this file really add up. > On my local test cluster of three brokers with around 40k partitions, the > initial LeaderAndIsrRequest refers to every partition in the cluster, and it > can take 20 to 30 minutes to create all of the replicas because the > _replication-offset-checkpoint_ is nearly 2MB. > Changing this code so that we only read this file once on startup reduces the > time to create all replicas to around one minute. > Credit to [~onurkaraman] for finding this one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5540) Deprecate and remove internal converter configs
[ https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336964#comment-16336964 ] ASF GitHub Bot commented on KAFKA-5540: --- umesh9794 opened a new pull request #4467: KAFKA-5540 : Deprecate and remove internal converter configs URL: https://github.com/apache/kafka/pull/4467 This PR deprecates the following configs: `internal.key.converter` `internal.value.converter` And removes their usage from connect-*.properties files. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate and remove internal converter configs > --- > > Key: KAFKA-5540 > URL: https://issues.apache.org/jira/browse/KAFKA-5540 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Ewen Cheslack-Postava >Priority: Major > Labels: needs-kip > > The internal.key.converter and internal.value.converter were original exposed > as configs because a) they are actually pluggable and b) providing a default > would require relying on the JsonConverter always being available, which > until we had classloader isolation it was possible might be removed for > compatibility reasons. > However, this has ultimately just caused a lot more trouble and confusion > than it is worth. We should deprecate the configs, give them a default of > JsonConverter (which is also kind of nice since it results in human-readable > data in the internal topics), and then ultimately remove them in the next > major version. > These are all public APIs so this will need a small KIP before we can make > the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledgment
[ https://issues.apache.org/jira/browse/KAFKA-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Litao Deng updated KAFKA-6477: -- Description: Hey folks. I would like to add a feature to support the quorum-based acknowledgment for the producer request. We have been running a modified version of Kafka on our testing cluster for weeks, the improvement of P999 is significant with very stable latency. Additionally, I have a proposal to achieve a similar data durability as with the insync.replicas-based acknowledgment through LEO-based leader election. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge] was: Hey folks. I would like to add a feature to support the quorum-based acknowledgment for the producer request. We have been running a modified version on our testing cluster, the improvement of P999 is significant with very stable latency. Additionally, I have a proposal to achieve a similar data durability as with the insync.replicas-based acknowledgment through LEO-based leader election. https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge > Add Support for Quorum-based Producer Acknowledgment > > > Key: KAFKA-6477 > URL: https://issues.apache.org/jira/browse/KAFKA-6477 > Project: Kafka > Issue Type: New Feature > Components: controller, producer >Reporter: Litao Deng >Priority: Major > > Hey folks. I would like to add a feature to support the quorum-based > acknowledgment for the producer request. We have been running a modified > version of Kafka on our testing cluster for weeks, the improvement of P999 is > significant with very stable latency. Additionally, I have a proposal to > achieve a similar data durability as with the insync.replicas-based > acknowledgment through LEO-based leader election. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledgment
[ https://issues.apache.org/jira/browse/KAFKA-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Litao Deng updated KAFKA-6477: -- Summary: Add Support for Quorum-based Producer Acknowledgment (was: Add Support for Quorum-based Producer Acknowledge) > Add Support for Quorum-based Producer Acknowledgment > > > Key: KAFKA-6477 > URL: https://issues.apache.org/jira/browse/KAFKA-6477 > Project: Kafka > Issue Type: New Feature > Components: controller, producer >Reporter: Litao Deng >Priority: Major > > Hey folks. I would like to add a feature to support the quorum-based > acknowledgment for the producer request. We have been running a modified > version on our testing cluster, the improvement of P999 is significant with > very stable latency. Additionally, I have a proposal to achieve a similar > data durability as with the insync.replicas-based acknowledgment through > LEO-based leader election. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledge
Litao Deng created KAFKA-6477: - Summary: Add Support for Quorum-based Producer Acknowledge Key: KAFKA-6477 URL: https://issues.apache.org/jira/browse/KAFKA-6477 Project: Kafka Issue Type: New Feature Components: controller, producer Reporter: Litao Deng Hey folks. I would like to add a feature to support the quorum-based acknowledgment for the producer request. We have been running a modified version on our testing cluster, the improvement of P999 is significant with very stable latency. Additionally, I have a proposal to achieve a similar data durability as with the insync.replicas-based acknowledgment through LEO-based leader election. https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers
[ https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6336. Resolution: Fixed > when using assign() with kafka consumer the KafkaConsumerGroup command doesnt > show those consumers > -- > > Key: KAFKA-6336 > URL: https://issues.apache.org/jira/browse/KAFKA-6336 > Project: Kafka > Issue Type: Bug >Reporter: Neerja Khattar >Priority: Major > > The issue is when using assign rather than subscribe for kafka consumers > commit not able to get the lag using ConsumerGroup command. It doesnt even > list those groups. > JMX tool also doesnt show lag properly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6421) consumer group can't show when use kafka-consumer-groups tool if the leader of __consumer_offsets partition for this group changed
[ https://issues.apache.org/jira/browse/KAFKA-6421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6421. Resolution: Fixed > consumer group can't show when use kafka-consumer-groups tool if the leader > of __consumer_offsets partition for this group changed > --- > > Key: KAFKA-6421 > URL: https://issues.apache.org/jira/browse/KAFKA-6421 > Project: Kafka > Issue Type: Bug > Components: offset manager, tools >Affects Versions: 0.10.2.0, 0.11.0.1, 1.0.0 >Reporter: yuemeng >Assignee: Vahid Hashemian >Priority: Critical > > follow steps will reproduce this problem: > 1)there is a 3 brokers kafka cluster,use kafka-console-consumer.sh to > consumer one topic and suppose group named "test-consumer-group",after a > while, we stop consumer this topic. > 2)suppose test-consumer-group will be write offset meta to partition 3 of > __consumer_offsets,and the leader of partition 3 is broker 1. > 3) and we scale the cluster to 5 nodes ,and reassign partititons,and leader > of partition 3 is change to 0 > 4)then we use kafka-consumer-groups.sh --list can't show > test-consumer-group,but the offset meta still in topic __consumer_offsets > and i found that: > protocolType will be change to empty for test-consumer-group after leader > changed. > at last. > {code} > listAllGroupsFlattened.filter(_.protocolType == > ConsumerProtocol.PROTOCOL_TYPE) > {code} > filter this test-consumer-group,so we can't use kafka-consumer-groups.sh to > list it -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6287) Inconsistent protocol type for empty consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6287. Resolution: Fixed > Inconsistent protocol type for empty consumer groups > > > Key: KAFKA-6287 > URL: https://issues.apache.org/jira/browse/KAFKA-6287 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Ryan Leslie >Assignee: Jason Gustafson >Priority: Minor > Fix For: 1.0.1 > > > When a consumer is created for a new group, the group metadata's protocol > type is set to 'consumer' and this is stored both in __consumer_offsets as > well as in the coordinator's local cache. > If the consumer leaves the group and the group becomes empty, ListGroups > requests will continue to show the group as type 'consumer', and as such > kafka-consumer-groups.sh will show it via --list. > However, if the coordinator (broker) node is killed and a new coordinator is > elected, when the GroupMetadataManager loads the group from > __consumer_offsets into its cache, it will not set the protocolType if there > are no active consumers. As a result, the group's protocolType will now > become the empty string (UNKNOWN_PROTOCOL_TYPE), and kafka-consumer-groups.sh > will no longer show the group. > Ideally bouncing a broker should not result in the group's protocol changing. > protocolType can be set in GroupMetadataManager.readGroupMessageValue() to > always reflect what's present in the persistent metadata (__consumer_offsets) > regardless if there are active members. > In general, things can get confusing when distinguishing between 'consumer' > and non-consumer groups. For example, DescribeGroups and OffsetFetchRequest > does not filter on protocol type, so it's possible for > kafka-consumer-groups.sh to describe groups (--describe) without actually > being able to list them. In the protocol guide, OffsetFetchRequest / > OffsetCommitRequest have their parameters listed as 'ConsumerGroup', but in > reality these can be used for groups of unknown type as well. For instance, a > consumer group can be copied by finding a coordinator > (GroupCoordinatorRequest / FindCoordinatorRequest) and sending an > OffsetCommitRequest. The group will be auto-created with an empty protocol. > Although this is arguably correct, the group will now exist but not be a > proper 'consumer' group until later when there is a JoinGroupRequest. Again, > this can be confusing as far as categorization / visibility of the group is > concerned. A group can instead be copied more directly by creating a consumer > and calling commitSync (as kafka-consumer-groups.sh), but this does involve > extra connections / requests and so is a little slower when trying to keep a > large number of groups in sync in real-time across clusters. > If we want to make it easier to keep consumer groups consistent, options > include allowing groups to be explicitly created with a protocol type instead > of only lazily, or for groups created outside of JoinGroupRequest the > coordinator can gain a broker config to set a default protocol type for > groups. This would default to 'consumer'. This sort of setting might be > confusing for users though, since implicitly created groups is certainly not > the norm. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336798#comment-16336798 ] Richard Yu edited comment on KAFKA-6035 at 1/24/18 2:50 AM: [~guozhang] Currently, I am unsure as to which class represents the change log because in the java doc of the file {{KGroupedStream.java}} I found it wrote: {code:java} /** Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, * allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. ... **/ KTableaggregrate(final Initializer initializer, final Aggregator aggregator, final Materialized > materialized); {code} In the last line of the above excerpt, it appears KTable was referred to as a changelog stream. Could you please help clarrify this for me? Thanks. was (Author: yohan123): [~guozhang] Currently, I am unsure as to which class represents the change log because in the java doc of the file \{{KGroupedStream.java}} I found it wrote: {code:java} /** Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, * allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. ... **/ KTable aggregrate(final Initializer initializer, final Aggregator aggregator, final Materialized > materialized); {code} In the last line of the above excerpt, it appears KTable was referred to as a changelog stream. Could you please help clarrify this for me? Thanks. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Richard Yu >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336798#comment-16336798 ] Richard Yu commented on KAFKA-6035: --- [~guozhang] Currently, I am unsure as to which class represents the change log because in the java doc of the file \{{KGroupedStream.java}} I found it wrote: {code:java} /** Aggregate the values of records in this stream by the grouped key. * Records with {@code null} key or value are ignored. * Aggregating is a generalization of {@link #reduce(Reducer) combining via reduce(...)} as it, for example, * allows the result to have a different type than the input values. * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) * that can be queried using the provided {@code queryableStoreName}. * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. ... **/ KTableaggregrate(final Initializer initializer, final Aggregator aggregator, final Materialized > materialized); {code} In the last line of the above excerpt, it appears KTable was referred to as a changelog stream. Could you please help clarrify this for me? Thanks. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Richard Yu >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336667#comment-16336667 ] Richard Yu commented on KAFKA-6035: --- If it is ok, I will be taking this one. Thanks. > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Richard Yu >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic
[ https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-6035: - Assignee: Richard Yu > Avoid creating changelog topics for state stores that are directly piped to a > sink topic > > > Key: KAFKA-6035 > URL: https://issues.apache.org/jira/browse/KAFKA-6035 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Guozhang Wang >Assignee: Richard Yu >Priority: Major > > Today Streams make all state stores to be backed by a changelog topic by > default unless users overrides it by {{disableLogging}} when creating the > state store / materializing the KTable. However there are a few cases where a > separate changelog topic would not be required as we can re-use an existing > topic for that. This ticket summarize a specific issue that can be optimized: > Consider the case when a KTable is materialized and then sent directly into a > sink topic with the same key, e.g. > {code} > table1 = stream.groupBy(...).aggregate("state1").to("topic2"); > {code} > Then we do not need to create a {{state1-changelog}} but can just use > {{topic2}} as its changelog. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6245) Enable reconfiguration of default topic configs used by brokers
[ https://issues.apache.org/jira/browse/KAFKA-6245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336652#comment-16336652 ] ASF GitHub Bot commented on KAFKA-6245: --- rajinisivaram opened a new pull request #4466: KAFKA-6245: Dynamic update of topic config defaults URL: https://github.com/apache/kafka/pull/4466 Dynamic update of default topic configs as described in KIP-226. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable reconfiguration of default topic configs used by brokers > --- > > Key: KAFKA-6245 > URL: https://issues.apache.org/jira/browse/KAFKA-6245 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-6058: --- Summary: KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient (was: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient) > KIP-222: Add "describe consumer groups" and "list consumer groups" to > KafkaAdminClient > -- > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Major > Labels: needs-kip > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336639#comment-16336639 ] Jeff Widman commented on KAFKA-6058: Note that listing group offsets has also been added to KIP-222 > KIP-222: Add "describe consumer groups" and "list consumer groups" to > KafkaAdminClient > -- > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Major > Labels: needs-kip > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6058) Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-6058: --- Description: {{KafkaAdminClient}} does not allow to get information about consumer groups. This feature is supported by old {{kafka.admin.AdminClient}} though. We should add {{KafkaAdminClient#describeConsumerGroups()}} and {{KafkaAdminClient#listConsumerGroup()}}. Associated KIP: KIP-222 was: {{KafkaAdminClient}} does not allow to get information about consumer groups. This feature is supported by old {{kafka.admin.AdminClient}} though. We should add {{KafkaAdminClient#describeConsumerGroups()}} and {{KafkaAdminClient#listConsumerGroup()}}. > Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient > - > > Key: KAFKA-6058 > URL: https://issues.apache.org/jira/browse/KAFKA-6058 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Matthias J. Sax >Assignee: Jorge Quilcate >Priority: Major > Labels: needs-kip > > {{KafkaAdminClient}} does not allow to get information about consumer groups. > This feature is supported by old {{kafka.admin.AdminClient}} though. > We should add {{KafkaAdminClient#describeConsumerGroups()}} and > {{KafkaAdminClient#listConsumerGroup()}}. > Associated KIP: KIP-222 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6244) Enable dynamic reconfiguration of log cleaners
[ https://issues.apache.org/jira/browse/KAFKA-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336617#comment-16336617 ] ASF GitHub Bot commented on KAFKA-6244: --- rajinisivaram opened a new pull request #4465: KAFKA-6244: Dynamic update of log cleaner configuration URL: https://github.com/apache/kafka/pull/4465 Log cleaner config update as described in KIP-226. Config updates are handled by stopping cleaner threads and starting new ones with the new config. This keeps the code simple and ensures that if any of the threads had terminated earlier due to an exception, new ones are created to match the configured thread count. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable dynamic reconfiguration of log cleaners > -- > > Key: KAFKA-6244 > URL: https://issues.apache.org/jira/browse/KAFKA-6244 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6243) Enable reconfiguration of metrics reporters and their custom configs
[ https://issues.apache.org/jira/browse/KAFKA-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336572#comment-16336572 ] ASF GitHub Bot commented on KAFKA-6243: --- rajinisivaram opened a new pull request #4464: KAFKA-6243: Enable dynamic updates of broker metrics reporters URL: https://github.com/apache/kafka/pull/4464 Dynamic metrics reporter updates described in KIP-226. This includes: - Addition and removal of metrics reporters - Reconfiguration of custom metrics reporter configs - Tests for metrics reporter updates at default cluster-level and as per-broker config for testing ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable reconfiguration of metrics reporters and their custom configs > > > Key: KAFKA-6243 > URL: https://issues.apache.org/jira/browse/KAFKA-6243 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6476) Document dynamic config update
Rajini Sivaram created KAFKA-6476: - Summary: Document dynamic config update Key: KAFKA-6476 URL: https://issues.apache.org/jira/browse/KAFKA-6476 Project: Kafka Issue Type: Sub-task Components: core, documentation Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled
[ https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6461. Resolution: Fixed > TableTableJoinIntegrationTest is unstable if caching is enabled > --- > > Key: KAFKA-6461 > URL: https://issues.apache.org/jira/browse/KAFKA-6461 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Ted Yu >Priority: Minor > > {noformat} > org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > > testLeftInner[caching enabled = true] FAILED > 20:41:05 java.lang.AssertionError: Condition not met within timeout > 15000. Never received expected final result. > 20:41:05 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) > 20:41:05 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254) > 20:41:05 at > org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248) > 20:41:05 at > org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6473) Add MockProcessorContext to public test-utils
[ https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6473: --- Labels: needs-kip user-experience (was: needs-kip) > Add MockProcessorContext to public test-utils > - > > Key: KAFKA-6473 > URL: https://issues.apache.org/jira/browse/KAFKA-6473 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip, user-experience > > With KIP-247, we added public test-utils artifact with a TopologyTestDriver > class. Using the test driver for a single > Processor/Transformer/ValueTransformer it's required to specify a whole > topology with source and sink and plus the > Processor/Transformer/ValueTransformer into it. > For unit testing, it might be more convenient to have a MockProcessorContext, > that can be used to test the Processor/Transformer/ValueTransformer in > isolation. Ie, the test itself creates new > Processor/Transformer/ValueTransformer object and calls init() manually > passing in the MockProcessorContext. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3625) Move kafka-streams test fixtures into a published package
[ https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-3625: --- Labels: kip user-experience (was: needs-kip user-experience) > Move kafka-streams test fixtures into a published package > - > > Key: KAFKA-3625 > URL: https://issues.apache.org/jira/browse/KAFKA-3625 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jeff Klukas >Assignee: Matthias J. Sax >Priority: Minor > Labels: kip, user-experience > Fix For: 1.1.0 > > > The KStreamTestDriver and related fixtures defined in > streams/src/test/java/org/apache/kafka/test would be useful to developers > building applications on top of Kafka Streams, but they are not currently > exposed in a package. > I propose moving this directory to live under streams/fixtures/src/main and > creating a new 'streams:fixtures' project in the gradle configuration to > publish these as a separate package. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled
[ https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336529#comment-16336529 ] ASF GitHub Bot commented on KAFKA-6461: --- guozhangwang closed pull request #4451: KAFKA-6461 TableTableJoinIntegrationTest is unstable if caching is enabled URL: https://github.com/apache/kafka/pull/4451 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java index f3eceb09735..b5e6fcb63b9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -79,7 +79,7 @@ public void prepareTopology() throws InterruptedException { @Override public void apply(final Long key, final String value) { numRecordsExpected++; -if (value.equals(expected)) { +if (expected.equals(value)) { boolean ret = finalResultReached.compareAndSet(false, true); if (!ret) { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TableTableJoinIntegrationTest is unstable if caching is enabled > --- > > Key: KAFKA-6461 > URL: https://issues.apache.org/jira/browse/KAFKA-6461 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Ted Yu >Priority: Minor > > {noformat} > org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > > testLeftInner[caching enabled = true] FAILED > 20:41:05 java.lang.AssertionError: Condition not met within timeout > 15000. Never received expected final result. > 20:41:05 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276) > 20:41:05 at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254) > 20:41:05 at > org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248) > 20:41:05 at > org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level
[ https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336483#comment-16336483 ] Jason Gustafson commented on KAFKA-6471: Thanks for reporting the issue. It would be helpful to understand your use case better. Why is it useful to distinguish between the last committed offset and the last stable offset? > seekToEnd and seek give unclear results for Consumer with read_committed > isolation level > > > Key: KAFKA-6471 > URL: https://issues.apache.org/jira/browse/KAFKA-6471 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Coen Damen >Priority: Major > > I am using the transactional KafkaProducer to send messages to a topic. This > works fine. I use a KafkaConsumer with read_committed isolation level and I > have an issue with the seek and seekToEnd methods. According to the > documentation, the seek and seekToEnd methods give me the LSO (Last Stable > Offset). But this is a bit confusing. As it gives me always the same value, > the END of the topic. No matter if the last entry is committed (by the > Producer) or part of an aborted transaction. Example, after I abort the last > 5 tries to insert 20_000 messages, the last 100_000 records should not be > read by the Consumer. But during a seekToEnd it moves to the end of the Topic > (including the 100_000 messages). But the poll() does not return them. > I am looking for a way to retrieve the Last Committed Offset (so the last > successful committed message by the Producer). There seems to be no proper > API method for this. So do I need to roll my own? > Option would be to move back and poll until no more records are retrieved, > this would result in the last committed message. But I would assume that > Kafka provides this method. > We use Kafka 1.0.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6475) ConfigException on the broker results in UnknownServerException in the admin client
Xavier Léauté created KAFKA-6475: Summary: ConfigException on the broker results in UnknownServerException in the admin client Key: KAFKA-6475 URL: https://issues.apache.org/jira/browse/KAFKA-6475 Project: Kafka Issue Type: Bug Affects Versions: 1.0.0 Reporter: Xavier Léauté Assignee: Colin P. McCabe Calling AdminClient.alterConfigs with an invalid configuration may cause ConfigException to be thrown on the broker side, which results in an UnknownServerException thrown by the admin client. It would probably make more sense for the admin client to throw InvalidConfigurationException in that case. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime
[ https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336402#comment-16336402 ] Nick Afshartous commented on KAFKA-5660: I'd like to take this as a starter task. > Don't throw TopologyBuilderException during runtime > --- > > Key: KAFKA-5660 > URL: https://issues.apache.org/jira/browse/KAFKA-5660 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Priority: Major > > {{TopologyBuilderException}} is a pre-runtime exception that should only be > thrown {{KafkaStreams#start()}} is called. > However, we do throw {{TopologyBuilderException}} within > - `SourceNodeFactory#getTopics` > - `ProcessorContextImpl#getStateStore` > - `StreamPartitionAssignor#prepareTopic ` > (and maybe somewhere else: we should double check if there are other places > in the code like those). > We should replace those exception with either {{StreamsException}} or with a > new exception type. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6472: --- Priority: Trivial (was: Major) > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6472: -- Assignee: Joel Hamill > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Major > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6472: --- Flags: (was: Important) > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Assignee: Joel Hamill >Priority: Trivial > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
Matthias J. Sax created KAFKA-6474: -- Summary: Rewrite test to use new public TopologyTestDriver Key: KAFKA-6474 URL: https://issues.apache.org/jira/browse/KAFKA-6474 Project: Kafka Issue Type: Improvement Components: streams, unit tests Affects Versions: 1.1.0 Reporter: Matthias J. Sax With KIP-247 we added public TopologyTestDriver. We should rewrite out own test to use this new test driver and remove the two classes ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6473) Add MockProcessorContext to public test-utils
Matthias J. Sax created KAFKA-6473: -- Summary: Add MockProcessorContext to public test-utils Key: KAFKA-6473 URL: https://issues.apache.org/jira/browse/KAFKA-6473 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Matthias J. Sax With KIP-247, we added public test-utils artifact with a TopologyTestDriver class. Using the test driver for a single Processor/Transformer/ValueTransformer it's required to specify a whole topology with source and sink and plus the Processor/Transformer/ValueTransformer into it. For unit testing, it might be more convenient to have a MockProcessorContext, that can be used to test the Processor/Transformer/ValueTransformer in isolation. Ie, the test itself creates new Processor/Transformer/ValueTransformer object and calls init() manually passing in the MockProcessorContext. This is a public API change and requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress
[ https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336114#comment-16336114 ] Jeff Widman commented on KAFKA-6469: We hit a similar issue when doing partition re-assignments across the cluster and the total payload was greater than 1MB... We solved it by raising the jute.maxbuffer.size limit to several MB. > ISR change notification queue can prevent controller from making progress > - > > Key: KAFKA-6469 > URL: https://issues.apache.org/jira/browse/KAFKA-6469 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > When the writes /isr_change_notification in ZooKeeper (which is effectively a > queue of ISR change events for the controller) happen at a rate high enough > that the node with a watch can't dequeue them, the trouble starts. > The watcher kafka.controller.IsrChangeNotificationListener is fired in the > controller when a new entry is written to /isr_change_notification, and the > zkclient library sends a GetChildrenRequest to zookeeper to fetch all child > znodes. > We've failures in one of our test clusters as the partition count started to > climb north of 60k per broker. We had brokers writing child nodes under > /isr_change_notification that were larger than the jute.maxbuffer size in > ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's > session, effectively bricking the cluster. > This can be partially mitigated by chunking ISR notifications to increase the > maximum number of partitions a broker can host. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6472: - Component/s: streams > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation, streams >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Priority: Major > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6472) WordCount example code error
[ https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6472: - Labels: newbie (was: maven) > WordCount example code error > > > Key: KAFKA-6472 > URL: https://issues.apache.org/jira/browse/KAFKA-6472 > Project: Kafka > Issue Type: Bug > Components: documentation >Affects Versions: 0.11.0.2 >Reporter: JONYhao >Priority: Major > Labels: newbie > Original Estimate: 168h > Remaining Estimate: 168h > > This is a "(" missed in the WordCount example tutorial > [https://kafka.apache.org/10/documentation/streams/tutorial] > at the end of the page ,line 31 > 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), > Serdes.Long());}} > {{should be }} > {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, > Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6470) no continuous offset for function seek
[ https://issues.apache.org/jira/browse/KAFKA-6470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chao.wu updated KAFKA-6470: --- Description: A topic-partition "adn-tracking,15" in kafka who's earliest offset is 1255644602 and latest offset is 1271253441. while starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record even after seeking to offset 1266921577". I implemented a simple project to use consumer to seek offset 1266921577. But it return the offset 1266921578. Then while seek to 1266921576, it return the 1266921576 exactly。 Why ? How to fix that ? There is the code: public class consumerDemo { public static void main(String[] argv) { Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumerconsumer = new KafkaConsumer (props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); Collection collection = new ArrayList(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecords consumerRecords = consumer.poll(1); List > listR = consumerRecords.records(tp); Iterator > iter = listR.iterator(); ConsumerRecord record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } was: A topic-partition "adn-tracking,15" in kafka who's earliest offset is 1255644602 and latest offset is 1271253441. while starting a spark streaming to process the data from the topic , we got a exception with "Got wrong record even after seeking to offset 1266921577". I implemented a simple project to use consumer to seek offset 1266921577. But it return the offset 1266921578. Then while seek to 1266921576, it return the 1266921576 exactly。 Why ? How to fix that ? There is the code: public class consumerDemo { public static void main(String[] argv){ Properties props = new Properties(); props.put("bootstrap.servers", "172.31.29.31:9091"); props.put("group.id", "consumer-tutorial-demo"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer consumer = new KafkaConsumer (props); TopicPartition tp = new TopicPartition("adn-tracking-click", 15); Collection collection = new ArrayList(); collection.add(tp); consumer.assign(collection); consumer.seek(tp, 1266921576); ConsumerRecords consumerRecords = consumer.poll(1); List > listR = consumerRecords.records(tp); Iterator > iter = listR.iterator(); ConsumerRecord record = iter.next(); System.out.println(" the next record " + record.offset() + " recode topic " + record.topic()); } } > no continuous offset for function seek > -- > > Key: KAFKA-6470 > URL: https://issues.apache.org/jira/browse/KAFKA-6470 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.0.1 >Reporter: chao.wu >Priority: Major > > A topic-partition "adn-tracking,15" in kafka who's earliest offset is > 1255644602 and latest offset is 1271253441. > while starting a spark streaming to process the data from the topic , we got > a exception with "Got wrong record even after seeking to offset > 1266921577". > I implemented a simple project to use consumer to seek offset 1266921577. > But it return the offset 1266921578. Then while seek to 1266921576, it > return the 1266921576 exactly。 > Why ? How to fix that ? > > > There is the code: > public class consumerDemo { > public static void main(String[] argv) > { > > Properties props = new Properties(); > props.put("bootstrap.servers", "172.31.29.31:9091"); > props.put("group.id", "consumer-tutorial-demo"); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > KafkaConsumer consumer = new KafkaConsumer String>(props); > TopicPartition tp = new TopicPartition("adn-tracking-click", 15); > Collection collection = new ArrayList(); > collection.add(tp); > consumer.assign(collection); > consumer.seek(tp, 1266921576); > ConsumerRecords consumerRecords = consumer.poll(1); > List > listR =
[jira] [Created] (KAFKA-6472) WordCount example code error
JONYhao created KAFKA-6472: -- Summary: WordCount example code error Key: KAFKA-6472 URL: https://issues.apache.org/jira/browse/KAFKA-6472 Project: Kafka Issue Type: Bug Components: documentation Affects Versions: 0.11.0.2 Reporter: JONYhao This is a "(" missed in the WordCount example tutorial [https://kafka.apache.org/10/documentation/streams/tutorial] at the end of the page ,line 31 31 {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), Serdes.Long());}} {{should be }} {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), Serdes.Long()));}}{{}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level
Coen Damen created KAFKA-6471: - Summary: seekToEnd and seek give unclear results for Consumer with read_committed isolation level Key: KAFKA-6471 URL: https://issues.apache.org/jira/browse/KAFKA-6471 Project: Kafka Issue Type: Bug Affects Versions: 1.0.0 Reporter: Coen Damen I am using the transactional KafkaProducer to send messages to a topic. This works fine. I use a KafkaConsumer with read_committed isolation level and I have an issue with the seek and seekToEnd methods. According to the documentation, the seek and seekToEnd methods give me the LSO (Last Stable Offset). But this is a bit confusing. As it gives me always the same value, the END of the topic. No matter if the last entry is committed (by the Producer) or part of an aborted transaction. Example, after I abort the last 5 tries to insert 20_000 messages, the last 100_000 records should not be read by the Consumer. But during a seekToEnd it moves to the end of the Topic (including the 100_000 messages). But the poll() does not return them. I am looking for a way to retrieve the Last Committed Offset (so the last successful committed message by the Producer). There seems to be no proper API method for this. So do I need to roll my own? Option would be to move back and poll until no more records are retrieved, this would result in the last committed message. But I would assume that Kafka provides this method. We use Kafka 1.0.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)