[jira] [Comment Edited] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level

2018-01-23 Thread Coen Damen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337075#comment-16337075
 ] 

Coen Damen edited comment on KAFKA-6471 at 1/24/18 7:40 AM:


Hi Jason, thanks for your reply.

The Use Case is the following.

We are retrieving log files from a machine. These log records are transformed 
into Kafka messages. Writing a log file into Kafka is atomic. In case of a read 
failure (of a file) or a write failure (to Kafka), the transaction of writing 
messages to Kafka should be aborted an tried again.

When tried again, or when idle for a longer time, during a restart or 
commencing of the "job" we want to read where the processing was halted. e.g. 
the last successfully processed file. For this I expected to use the seekToEnd 
with a Consumer that has the setting read_committed. But, it moved to the end 
of the Topic, even after the Topic contained many aborted messages at the end.

Note: the filename and the index within the file are part of the message, So we 
want to retrieve the last successful message and extract the filename from it.

Thank you,

Coen

 


was (Author: coenos):
Hi Jason, thanks for your reply.

The Use Case is the following.

We are retrieving log files from a machine. These log records are transformed 
into Kafka messages. Writing a log file into Kafka is atomic. In case of a read 
failure (of a file) or a write failure (to Kafka), the transaction of writing 
messages to Kafka should be aborted an tried again.

When tried again, or when idle for a longer time, during a restart or 
commencing of the "job" we want to read where the processing was halted. e.g. 
the last successfully processed file. For this I expected to use the seekToEnd 
with a Consumer that has the setting read_committed. But, it moved to the end 
of the Topic, even after the Topic contained many aborted messages at the end.

Thank you,

Coen

 

> seekToEnd and seek give unclear results for Consumer with read_committed 
> isolation level
> 
>
> Key: KAFKA-6471
> URL: https://issues.apache.org/jira/browse/KAFKA-6471
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Coen Damen
>Priority: Major
>
> I am using the transactional KafkaProducer to send messages to a topic. This 
> works fine. I use a KafkaConsumer with read_committed isolation level and I 
> have an issue with the seek and seekToEnd methods. According to the 
> documentation, the seek and seekToEnd methods give me the LSO (Last Stable 
> Offset). But this is a bit confusing. As it gives me always the same value, 
> the END of the topic. No matter if the last entry is committed (by the 
> Producer) or part of an aborted transaction. Example, after I abort the last 
> 5 tries to insert 20_000 messages, the last 100_000 records should not be 
> read by the Consumer. But during a seekToEnd it moves to the end of the Topic 
> (including the 100_000 messages). But the poll() does not return them.
> I am looking for a way to retrieve the Last Committed Offset (so the last 
> successful committed message by the Producer). There seems to be no proper 
> API method for this. So do I need to roll my own?
> Option would be to move back and poll until no more records are retrieved, 
> this would result in the last committed message. But I would assume that 
> Kafka provides this method.
> We use Kafka 1.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level

2018-01-23 Thread Coen Damen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337075#comment-16337075
 ] 

Coen Damen commented on KAFKA-6471:
---

Hi Jason, thanks for your reply.

The Use Case is the following.

We are retrieving log files from a machine. These log records are transformed 
into Kafka messages. Writing a log file into Kafka is atomic. In case of a read 
failure (of a file) or a write failure (to Kafka), the transaction of writing 
messages to Kafka should be aborted an tried again.

When tried again, or when idle for a longer time, during a restart or 
commencing of the "job" we want to read where the processing was halted. e.g. 
the last successfully processed file. For this I expected to use the seekToEnd 
with a Consumer that has the setting read_committed. But, it moved to the end 
of the Topic, even after the Topic contained many aborted messages at the end.

Thank you,

Coen

 

> seekToEnd and seek give unclear results for Consumer with read_committed 
> isolation level
> 
>
> Key: KAFKA-6471
> URL: https://issues.apache.org/jira/browse/KAFKA-6471
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Coen Damen
>Priority: Major
>
> I am using the transactional KafkaProducer to send messages to a topic. This 
> works fine. I use a KafkaConsumer with read_committed isolation level and I 
> have an issue with the seek and seekToEnd methods. According to the 
> documentation, the seek and seekToEnd methods give me the LSO (Last Stable 
> Offset). But this is a bit confusing. As it gives me always the same value, 
> the END of the topic. No matter if the last entry is committed (by the 
> Producer) or part of an aborted transaction. Example, after I abort the last 
> 5 tries to insert 20_000 messages, the last 100_000 records should not be 
> read by the Consumer. But during a seekToEnd it moves to the end of the Topic 
> (including the 100_000 messages). But the poll() does not return them.
> I am looking for a way to retrieve the Last Committed Offset (so the last 
> successful committed message by the Producer). There seems to be no proper 
> API method for this. So do I need to roll my own?
> Option would be to move back and poll until no more records are retrieved, 
> this would result in the last committed message. But I would assume that 
> Kafka provides this method.
> We use Kafka 1.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5142) KIP-145 - Expose Record Headers in Kafka Connect

2018-01-23 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337069#comment-16337069
 ] 

Randall Hauch commented on KAFKA-5142:
--

KIP-145 has passed.

> KIP-145 - Expose Record Headers in Kafka Connect
> 
>
> Key: KAFKA-5142
> URL: https://issues.apache.org/jira/browse/KAFKA-5142
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Reporter: Michael Andre Pearce
>Assignee: Michael Andre Pearce
>Priority: Major
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect
> As KIP-82 introduced Headers into the core Kafka Product, it would be 
> advantageous to expose them in the Kafka Connect Framework.
> Connectors that replicate data between Kafka cluster or between other 
> messaging products and Kafka would want to replicate the headers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336969#comment-16336969
 ] 

ASF GitHub Bot commented on KAFKA-6468:
---

ambroff opened a new pull request #4468: KAFKA-6468 Read 
replication-offset-checkpoint once
URL: https://github.com/apache/kafka/pull/4468
 
 
   Only read the high watermark checkpoint
   file (replication-offset-checkpoint) once. Before this patch, this file
   is read every time the broker handles LeaderAndIsrRequest. See
   kafka.cluster.Partition#getOrCreateReplica(Int, Boolean).
   
   On my local test cluster of three brokers with around 40k partitions,
   the initial LeaderAndIsrRequest refers to every partition in the
   cluster, and it can take 20 to 30 minutes to create all of the replicas
   because the replication-offset-checkpoint is nearly 2MB.
   
   Changing this code so that we only read this file once on startup
   reduces the time to create all replicas to around one minute.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5540) Deprecate and remove internal converter configs

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336964#comment-16336964
 ] 

ASF GitHub Bot commented on KAFKA-5540:
---

umesh9794 opened a new pull request #4467: KAFKA-5540 : Deprecate and remove 
internal converter configs
URL: https://github.com/apache/kafka/pull/4467
 
 
   This PR deprecates the following configs: 
   
   `internal.key.converter`
   `internal.value.converter`
   
   And removes their usage from connect-*.properties files.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate and remove internal converter configs
> ---
>
> Key: KAFKA-5540
> URL: https://issues.apache.org/jira/browse/KAFKA-5540
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: needs-kip
>
> The internal.key.converter and internal.value.converter were original exposed 
> as configs because a) they are actually pluggable and b) providing a default 
> would require relying on the JsonConverter always being available, which 
> until we had classloader isolation it was possible might be removed for 
> compatibility reasons.
> However, this has ultimately just caused a lot more trouble and confusion 
> than it is worth. We should deprecate the configs, give them a default of 
> JsonConverter (which is also kind of nice since it results in human-readable 
> data in the internal topics), and then ultimately remove them in the next 
> major version.
> These are all public APIs so this will need a small KIP before we can make 
> the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledgment

2018-01-23 Thread Litao Deng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Litao Deng updated KAFKA-6477:
--
Description: 
Hey folks. I would like to add a feature to support the quorum-based 
acknowledgment for the producer request. We have been running a modified 
version of Kafka on our testing cluster for weeks, the improvement of P999 is 
significant with very stable latency. Additionally, I have a proposal to 
achieve a similar data durability as with the insync.replicas-based 
acknowledgment through LEO-based leader election.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge]

  was:
Hey folks. I would like to add a feature to support the quorum-based 
acknowledgment for the producer request. We have been running a modified 
version on our testing cluster, the improvement of P999 is significant with 
very stable latency. Additionally, I have a proposal to achieve a similar data 
durability as with the insync.replicas-based acknowledgment through LEO-based 
leader election.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge


> Add Support for Quorum-based Producer Acknowledgment
> 
>
> Key: KAFKA-6477
> URL: https://issues.apache.org/jira/browse/KAFKA-6477
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, producer 
>Reporter: Litao Deng
>Priority: Major
>
> Hey folks. I would like to add a feature to support the quorum-based 
> acknowledgment for the producer request. We have been running a modified 
> version of Kafka on our testing cluster for weeks, the improvement of P999 is 
> significant with very stable latency. Additionally, I have a proposal to 
> achieve a similar data durability as with the insync.replicas-based 
> acknowledgment through LEO-based leader election.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledgment

2018-01-23 Thread Litao Deng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Litao Deng updated KAFKA-6477:
--
Summary: Add Support for Quorum-based Producer Acknowledgment  (was: Add 
Support for Quorum-based Producer Acknowledge)

> Add Support for Quorum-based Producer Acknowledgment
> 
>
> Key: KAFKA-6477
> URL: https://issues.apache.org/jira/browse/KAFKA-6477
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, producer 
>Reporter: Litao Deng
>Priority: Major
>
> Hey folks. I would like to add a feature to support the quorum-based 
> acknowledgment for the producer request. We have been running a modified 
> version on our testing cluster, the improvement of P999 is significant with 
> very stable latency. Additionally, I have a proposal to achieve a similar 
> data durability as with the insync.replicas-based acknowledgment through 
> LEO-based leader election.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6477) Add Support for Quorum-based Producer Acknowledge

2018-01-23 Thread Litao Deng (JIRA)
Litao Deng created KAFKA-6477:
-

 Summary: Add Support for Quorum-based Producer Acknowledge
 Key: KAFKA-6477
 URL: https://issues.apache.org/jira/browse/KAFKA-6477
 Project: Kafka
  Issue Type: New Feature
  Components: controller, producer 
Reporter: Litao Deng


Hey folks. I would like to add a feature to support the quorum-based 
acknowledgment for the producer request. We have been running a modified 
version on our testing cluster, the improvement of P999 is significant with 
very stable latency. Additionally, I have a proposal to achieve a similar data 
durability as with the insync.replicas-based acknowledgment through LEO-based 
leader election.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-250+Add+Support+for+Quorum-based+Producer+Acknowledge



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6336) when using assign() with kafka consumer the KafkaConsumerGroup command doesnt show those consumers

2018-01-23 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6336.

Resolution: Fixed

> when using assign() with kafka consumer the KafkaConsumerGroup command doesnt 
> show those consumers
> --
>
> Key: KAFKA-6336
> URL: https://issues.apache.org/jira/browse/KAFKA-6336
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neerja Khattar
>Priority: Major
>
> The issue is when using assign rather than subscribe for kafka consumers 
> commit not able to get the lag using ConsumerGroup command. It doesnt even 
> list those groups.
> JMX tool also doesnt show lag properly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6421) consumer group can't show when use kafka-consumer-groups tool if the leader of __consumer_offsets partition for this group changed

2018-01-23 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6421.

Resolution: Fixed

> consumer group can't show when use kafka-consumer-groups tool if the leader 
> of  __consumer_offsets partition for this group changed
> ---
>
> Key: KAFKA-6421
> URL: https://issues.apache.org/jira/browse/KAFKA-6421
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager, tools
>Affects Versions: 0.10.2.0, 0.11.0.1, 1.0.0
>Reporter: yuemeng
>Assignee: Vahid Hashemian
>Priority: Critical
>
> follow steps will reproduce this problem:
> 1)there is a 3 brokers kafka cluster,use kafka-console-consumer.sh to 
> consumer one topic and suppose group named "test-consumer-group",after a 
> while, we stop consumer this topic.
> 2)suppose test-consumer-group will be write offset meta to partition 3 of 
> __consumer_offsets,and the leader of partition 3 is broker 1.
> 3) and we scale the cluster to 5 nodes ,and reassign partititons,and  leader 
> of partition 3 is change to 0
> 4)then we use kafka-consumer-groups.sh --list can't show 
> test-consumer-group,but the offset meta still in topic __consumer_offsets
> and i found that:
> protocolType  will be change to empty for test-consumer-group after leader 
> changed.
> at last.
> {code}
> listAllGroupsFlattened.filter(_.protocolType == 
> ConsumerProtocol.PROTOCOL_TYPE)
> {code}
> filter this test-consumer-group,so we can't use kafka-consumer-groups.sh to 
> list it



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6287) Inconsistent protocol type for empty consumer groups

2018-01-23 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6287.

Resolution: Fixed

> Inconsistent protocol type for empty consumer groups
> 
>
> Key: KAFKA-6287
> URL: https://issues.apache.org/jira/browse/KAFKA-6287
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Ryan Leslie
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 1.0.1
>
>
> When a consumer is created for a new group, the group metadata's protocol 
> type is set to 'consumer' and this is stored both in __consumer_offsets as 
> well as in the coordinator's local cache.
> If the consumer leaves the group and the group becomes empty, ListGroups 
> requests will continue to show the group as type 'consumer', and as such 
> kafka-consumer-groups.sh will show it via --list.
> However, if the coordinator (broker) node is killed and a new coordinator is 
> elected, when the GroupMetadataManager loads the group from 
> __consumer_offsets into its cache, it will not set the protocolType if there 
> are no active consumers. As a result, the group's protocolType will now 
> become the empty string (UNKNOWN_PROTOCOL_TYPE), and kafka-consumer-groups.sh 
> will no longer show the group.
> Ideally bouncing a broker should not result in the group's protocol changing. 
> protocolType can be set in GroupMetadataManager.readGroupMessageValue() to 
> always reflect what's present in the persistent metadata (__consumer_offsets) 
> regardless if there are active members.
> In general, things can get confusing when distinguishing between 'consumer' 
> and non-consumer groups. For example, DescribeGroups and OffsetFetchRequest 
> does not filter on protocol type, so it's possible for 
> kafka-consumer-groups.sh to describe groups (--describe) without actually 
> being able to list them. In the protocol guide, OffsetFetchRequest / 
> OffsetCommitRequest have their parameters listed as 'ConsumerGroup', but in 
> reality these can be used for groups of unknown type as well. For instance, a 
> consumer group can be copied by finding a coordinator 
> (GroupCoordinatorRequest / FindCoordinatorRequest) and sending an 
> OffsetCommitRequest. The group will be auto-created with an empty protocol. 
> Although this is arguably correct, the group will now exist but not be  a 
> proper 'consumer' group until later when there is a JoinGroupRequest. Again, 
> this can be confusing as far as categorization / visibility of the group is 
> concerned. A group can instead be copied more directly by creating a consumer 
> and calling commitSync (as kafka-consumer-groups.sh), but this does involve 
> extra connections / requests and so is a little slower when trying to keep a 
> large number of groups in sync in real-time across clusters.
> If we want to make it easier to keep consumer groups consistent, options 
> include allowing groups to be explicitly created with a protocol type instead 
> of only lazily, or for groups created outside of JoinGroupRequest the 
> coordinator can gain a broker config to set a default protocol type for 
> groups. This would default to 'consumer'. This sort of setting might be 
> confusing for users though, since implicitly created groups is certainly not 
> the norm.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2018-01-23 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336798#comment-16336798
 ] 

Richard Yu edited comment on KAFKA-6035 at 1/24/18 2:50 AM:


[~guozhang] Currently, I am unsure as to which class represents the change log 
because in the java doc of the file {{KGroupedStream.java}} I found it wrote:

 
{code:java}
/**
Aggregate the values of records in this stream by the grouped key.
 * Records with {@code null} key or value are ignored.
 * Aggregating is a generalization of {@link #reduce(Reducer) combining via 
reduce(...)} as it, for example,
 * allows the result to have a different type than the input values.
 * The result is written into a local {@link KeyValueStore} (which is 
basically an ever-updating materialized view)
 * that can be queried using the provided {@code queryableStoreName}.
 * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
...
**/
KTable aggregrate(final Initializer initializer,
    final Aggregator aggregator,
    final Materialized> 
materialized); 
{code}
In the last line of the above excerpt, it appears KTable was referred to as a 
changelog stream. Could you please help clarrify this for me?

Thanks.

 


was (Author: yohan123):
[~guozhang] Currently, I am unsure as to which class represents the change log 
because in the java doc of the file \{{KGroupedStream.java}} I found it wrote:

 
{code:java}
/**
Aggregate the values of records in this stream by the grouped key.
 * Records with {@code null} key or value are ignored.
 * Aggregating is a generalization of {@link #reduce(Reducer) combining via 
reduce(...)} as it, for example,
 * allows the result to have a different type than the input values.
 * The result is written into a local {@link KeyValueStore} (which is 
basically an ever-updating materialized view)
 * that can be queried using the provided {@code queryableStoreName}.
 * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
...
**/
KTable aggregrate(final Initializer initializer,
    final Aggregator aggregator,
    final Materialized> 
materialized); 
{code}
In the last line of the above excerpt, it appears KTable was referred to as a 
changelog stream. Could you please help clarrify this for me?

Thanks.

 

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2018-01-23 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336798#comment-16336798
 ] 

Richard Yu commented on KAFKA-6035:
---

[~guozhang] Currently, I am unsure as to which class represents the change log 
because in the java doc of the file \{{KGroupedStream.java}} I found it wrote:

 
{code:java}
/**
Aggregate the values of records in this stream by the grouped key.
 * Records with {@code null} key or value are ignored.
 * Aggregating is a generalization of {@link #reduce(Reducer) combining via 
reduce(...)} as it, for example,
 * allows the result to have a different type than the input values.
 * The result is written into a local {@link KeyValueStore} (which is 
basically an ever-updating materialized view)
 * that can be queried using the provided {@code queryableStoreName}.
 * Furthermore, updates to the store are sent downstream into a {@link 
KTable} changelog stream.
...
**/
KTable aggregrate(final Initializer initializer,
    final Aggregator aggregator,
    final Materialized> 
materialized); 
{code}
In the last line of the above excerpt, it appears KTable was referred to as a 
changelog stream. Could you please help clarrify this for me?

Thanks.

 

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2018-01-23 Thread Richard Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336667#comment-16336667
 ] 

Richard Yu commented on KAFKA-6035:
---

If it is ok, I will be taking this one. Thanks.

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6035) Avoid creating changelog topics for state stores that are directly piped to a sink topic

2018-01-23 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-6035:
-

Assignee: Richard Yu

> Avoid creating changelog topics for state stores that are directly piped to a 
> sink topic
> 
>
> Key: KAFKA-6035
> URL: https://issues.apache.org/jira/browse/KAFKA-6035
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Richard Yu
>Priority: Major
>
> Today Streams make all state stores to be backed by a changelog topic by 
> default unless users overrides it by {{disableLogging}} when creating the 
> state store / materializing the KTable. However there are a few cases where a 
> separate changelog topic would not be required as we can re-use an existing 
> topic for that. This ticket summarize a specific issue that can be optimized:
> Consider the case when a KTable is materialized and then sent directly into a 
> sink topic with the same key, e.g.
> {code}
> table1 = stream.groupBy(...).aggregate("state1").to("topic2");
> {code}
> Then we do not need to create a {{state1-changelog}} but can just use 
> {{topic2}} as its changelog.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6245) Enable reconfiguration of default topic configs used by brokers

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336652#comment-16336652
 ] 

ASF GitHub Bot commented on KAFKA-6245:
---

rajinisivaram opened a new pull request #4466: KAFKA-6245: Dynamic update of 
topic config defaults
URL: https://github.com/apache/kafka/pull/4466
 
 
   Dynamic update of default topic configs as described in KIP-226.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable reconfiguration of default topic configs used by brokers
> ---
>
> Key: KAFKA-6245
> URL: https://issues.apache.org/jira/browse/KAFKA-6245
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient

2018-01-23 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6058:
---
Summary: KIP-222: Add "describe consumer groups" and "list consumer groups" 
to KafkaAdminClient  (was: Add "describe consumer groups" and "list consumer 
groups" to KafkaAdminClient)

> KIP-222: Add "describe consumer groups" and "list consumer groups" to 
> KafkaAdminClient
> --
>
> Key: KAFKA-6058
> URL: https://issues.apache.org/jira/browse/KAFKA-6058
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Major
>  Labels: needs-kip
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6058) KIP-222: Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient

2018-01-23 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336639#comment-16336639
 ] 

Jeff Widman commented on KAFKA-6058:


Note that listing group offsets has also been added to KIP-222

> KIP-222: Add "describe consumer groups" and "list consumer groups" to 
> KafkaAdminClient
> --
>
> Key: KAFKA-6058
> URL: https://issues.apache.org/jira/browse/KAFKA-6058
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Major
>  Labels: needs-kip
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6058) Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient

2018-01-23 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6058:
---
Description: 
{{KafkaAdminClient}} does not allow to get information about consumer groups. 
This feature is supported by old {{kafka.admin.AdminClient}} though.

We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
{{KafkaAdminClient#listConsumerGroup()}}.

Associated KIP: KIP-222

  was:
{{KafkaAdminClient}} does not allow to get information about consumer groups. 
This feature is supported by old {{kafka.admin.AdminClient}} though.

We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
{{KafkaAdminClient#listConsumerGroup()}}.


> Add "describe consumer groups" and "list consumer groups" to KafkaAdminClient
> -
>
> Key: KAFKA-6058
> URL: https://issues.apache.org/jira/browse/KAFKA-6058
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Matthias J. Sax
>Assignee: Jorge Quilcate
>Priority: Major
>  Labels: needs-kip
>
> {{KafkaAdminClient}} does not allow to get information about consumer groups. 
> This feature is supported by old {{kafka.admin.AdminClient}} though.
> We should add {{KafkaAdminClient#describeConsumerGroups()}} and 
> {{KafkaAdminClient#listConsumerGroup()}}.
> Associated KIP: KIP-222



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6244) Enable dynamic reconfiguration of log cleaners

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336617#comment-16336617
 ] 

ASF GitHub Bot commented on KAFKA-6244:
---

rajinisivaram opened a new pull request #4465: KAFKA-6244: Dynamic update of 
log cleaner configuration
URL: https://github.com/apache/kafka/pull/4465
 
 
   Log cleaner config update as described in KIP-226. Config updates are 
handled by stopping cleaner threads and starting new ones with the new config. 
This keeps the code simple and ensures that if any of the threads had 
terminated earlier due to an exception, new ones are created to match the 
configured thread count.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable dynamic reconfiguration of log cleaners
> --
>
> Key: KAFKA-6244
> URL: https://issues.apache.org/jira/browse/KAFKA-6244
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6243) Enable reconfiguration of metrics reporters and their custom configs

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336572#comment-16336572
 ] 

ASF GitHub Bot commented on KAFKA-6243:
---

rajinisivaram opened a new pull request #4464: KAFKA-6243: Enable dynamic 
updates of broker metrics reporters
URL: https://github.com/apache/kafka/pull/4464
 
 
   Dynamic metrics reporter updates described in KIP-226. This includes:
- Addition and removal of metrics reporters
- Reconfiguration of custom metrics reporter configs
- Tests for metrics reporter updates at default cluster-level and as 
per-broker config for testing
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable reconfiguration of metrics reporters and their custom configs
> 
>
> Key: KAFKA-6243
> URL: https://issues.apache.org/jira/browse/KAFKA-6243
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6476) Document dynamic config update

2018-01-23 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6476:
-

 Summary: Document dynamic config update
 Key: KAFKA-6476
 URL: https://issues.apache.org/jira/browse/KAFKA-6476
 Project: Kafka
  Issue Type: Sub-task
  Components: core, documentation
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 1.1.0


Add documentation for dynamic broker config update.

Include:

  - Command line options for kafka-configs.sh with examples

  - Configs that can be updated along with constraints applied



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6461.

Resolution: Fixed

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Ted Yu
>Priority: Minor
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6473:
---
Labels: needs-kip user-experience  (was: needs-kip)

> Add MockProcessorContext to public test-utils
> -
>
> Key: KAFKA-6473
> URL: https://issues.apache.org/jira/browse/KAFKA-6473
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip, user-experience
>
> With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
> class. Using the test driver for a single 
> Processor/Transformer/ValueTransformer it's required to specify a whole 
> topology with source and sink and plus the 
> Processor/Transformer/ValueTransformer into it.
> For unit testing, it might be more convenient to have a MockProcessorContext, 
> that can be used to test the Processor/Transformer/ValueTransformer in 
> isolation. Ie, the test itself creates new 
> Processor/Transformer/ValueTransformer object and calls init() manually 
> passing in the MockProcessorContext.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3625) Move kafka-streams test fixtures into a published package

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3625:
---
Labels: kip user-experience  (was: needs-kip user-experience)

> Move kafka-streams test fixtures into a published package
> -
>
> Key: KAFKA-3625
> URL: https://issues.apache.org/jira/browse/KAFKA-3625
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jeff Klukas
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: kip, user-experience
> Fix For: 1.1.0
>
>
> The KStreamTestDriver and related fixtures defined in 
> streams/src/test/java/org/apache/kafka/test would be useful to developers 
> building applications on top of Kafka Streams, but they are not currently 
> exposed in a package.
> I propose moving this directory to live under streams/fixtures/src/main and 
> creating a new 'streams:fixtures' project in the gradle configuration to 
> publish these as a separate package.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336529#comment-16336529
 ] 

ASF GitHub Bot commented on KAFKA-6461:
---

guozhangwang closed pull request #4451: KAFKA-6461 
TableTableJoinIntegrationTest is unstable if caching is enabled
URL: https://github.com/apache/kafka/pull/4451
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
index f3eceb09735..b5e6fcb63b9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -79,7 +79,7 @@ public void prepareTopology() throws InterruptedException {
 @Override
 public void apply(final Long key, final String value) {
 numRecordsExpected++;
-if (value.equals(expected)) {
+if (expected.equals(value)) {
 boolean ret = finalResultReached.compareAndSet(false, true);
 
 if (!ret) {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Ted Yu
>Priority: Minor
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level

2018-01-23 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336483#comment-16336483
 ] 

Jason Gustafson commented on KAFKA-6471:


Thanks for reporting the issue. It would be helpful to understand your use case 
better. Why is it useful to distinguish between the last committed offset and 
the last stable offset?

> seekToEnd and seek give unclear results for Consumer with read_committed 
> isolation level
> 
>
> Key: KAFKA-6471
> URL: https://issues.apache.org/jira/browse/KAFKA-6471
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Coen Damen
>Priority: Major
>
> I am using the transactional KafkaProducer to send messages to a topic. This 
> works fine. I use a KafkaConsumer with read_committed isolation level and I 
> have an issue with the seek and seekToEnd methods. According to the 
> documentation, the seek and seekToEnd methods give me the LSO (Last Stable 
> Offset). But this is a bit confusing. As it gives me always the same value, 
> the END of the topic. No matter if the last entry is committed (by the 
> Producer) or part of an aborted transaction. Example, after I abort the last 
> 5 tries to insert 20_000 messages, the last 100_000 records should not be 
> read by the Consumer. But during a seekToEnd it moves to the end of the Topic 
> (including the 100_000 messages). But the poll() does not return them.
> I am looking for a way to retrieve the Last Committed Offset (so the last 
> successful committed message by the Producer). There seems to be no proper 
> API method for this. So do I need to roll my own?
> Option would be to move back and poll until no more records are retrieved, 
> this would result in the last committed message. But I would assume that 
> Kafka provides this method.
> We use Kafka 1.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6475) ConfigException on the broker results in UnknownServerException in the admin client

2018-01-23 Thread JIRA
Xavier Léauté created KAFKA-6475:


 Summary: ConfigException on the broker results in 
UnknownServerException in the admin client
 Key: KAFKA-6475
 URL: https://issues.apache.org/jira/browse/KAFKA-6475
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Xavier Léauté
Assignee: Colin P. McCabe


Calling AdminClient.alterConfigs with an invalid configuration may cause 
ConfigException to be thrown on the broker side, which results in an 
UnknownServerException thrown by the admin client. It would probably make more 
sense for the admin client to throw InvalidConfigurationException in that case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5660) Don't throw TopologyBuilderException during runtime

2018-01-23 Thread Nick Afshartous (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336402#comment-16336402
 ] 

Nick Afshartous commented on KAFKA-5660:


I'd like to take this as a starter task. 

> Don't throw TopologyBuilderException during runtime
> ---
>
> Key: KAFKA-5660
> URL: https://issues.apache.org/jira/browse/KAFKA-5660
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{TopologyBuilderException}} is a pre-runtime exception that should only be 
> thrown {{KafkaStreams#start()}} is called.
> However, we do throw {{TopologyBuilderException}} within
> - `SourceNodeFactory#getTopics`
> - `ProcessorContextImpl#getStateStore`
> - `StreamPartitionAssignor#prepareTopic `
> (and maybe somewhere else: we should double check if there are other places 
> in the code like those).
> We should replace those exception with either {{StreamsException}} or with a 
> new exception type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6472) WordCount example code error

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6472:
---
Priority: Trivial  (was: Major)

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6472) WordCount example code error

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6472:
--

Assignee: Joel Hamill

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Major
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6472) WordCount example code error

2018-01-23 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6472:
---
Flags:   (was: Important)

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Assignee: Joel Hamill
>Priority: Trivial
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-01-23 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6474:
--

 Summary: Rewrite test to use new public TopologyTestDriver
 Key: KAFKA-6474
 URL: https://issues.apache.org/jira/browse/KAFKA-6474
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Affects Versions: 1.1.0
Reporter: Matthias J. Sax


With KIP-247 we added public TopologyTestDriver. We should rewrite out own test 
to use this new test driver and remove the two classes 
ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6473) Add MockProcessorContext to public test-utils

2018-01-23 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6473:
--

 Summary: Add MockProcessorContext to public test-utils
 Key: KAFKA-6473
 URL: https://issues.apache.org/jira/browse/KAFKA-6473
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Matthias J. Sax


With KIP-247, we added public test-utils artifact with a TopologyTestDriver 
class. Using the test driver for a single 
Processor/Transformer/ValueTransformer it's required to specify a whole 
topology with source and sink and plus the 
Processor/Transformer/ValueTransformer into it.

For unit testing, it might be more convenient to have a MockProcessorContext, 
that can be used to test the Processor/Transformer/ValueTransformer in 
isolation. Ie, the test itself creates new 
Processor/Transformer/ValueTransformer object and calls init() manually passing 
in the MockProcessorContext.

This is a public API change and requires a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress

2018-01-23 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336114#comment-16336114
 ] 

Jeff Widman commented on KAFKA-6469:


We hit a similar issue when doing partition re-assignments across the cluster 
and the total payload was greater than 1MB... We solved it by raising the 
jute.maxbuffer.size limit to several MB.

> ISR change notification queue can prevent controller from making progress
> -
>
> Key: KAFKA-6469
> URL: https://issues.apache.org/jira/browse/KAFKA-6469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> When the writes /isr_change_notification in ZooKeeper (which is effectively a 
> queue of ISR change events for the controller) happen at a rate high enough 
> that the node with a watch can't dequeue them, the trouble starts.
> The watcher kafka.controller.IsrChangeNotificationListener is fired in the 
> controller when a new entry is written to /isr_change_notification, and the 
> zkclient library sends a GetChildrenRequest to zookeeper to fetch all child 
> znodes.
> We've failures in one of our test clusters as the partition count started to 
> climb north of 60k per broker. We had brokers writing child nodes under 
> /isr_change_notification that were larger than the jute.maxbuffer size in 
> ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's 
> session, effectively bricking the cluster.
> This can be partially mitigated by chunking ISR notifications to increase the 
> maximum number of partitions a broker can host.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6472) WordCount example code error

2018-01-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6472:
-
Component/s: streams

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Priority: Major
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6472) WordCount example code error

2018-01-23 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6472:
-
Labels: newbie  (was: maven)

> WordCount example code error
> 
>
> Key: KAFKA-6472
> URL: https://issues.apache.org/jira/browse/KAFKA-6472
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.11.0.2
>Reporter: JONYhao
>Priority: Major
>  Labels: newbie
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> This is a "(" missed in the WordCount example tutorial
> [https://kafka.apache.org/10/documentation/streams/tutorial]
> at the end of the page ,line 31
> 31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
> Serdes.Long());}}
> {{should be }}
> {{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, 
> Produced.with(Serdes.String(), Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6470) no continuous offset for function seek

2018-01-23 Thread chao.wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chao.wu updated KAFKA-6470:
---
Description: 
A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  
1255644602 and  latest offset is 1271253441.  

while starting a spark streaming to process the data from the topic ,  we got a 
exception with "Got wrong record   even after seeking to offset 1266921577".

I  implemented a simple project to use consumer to  seek offset 1266921577. But 
it return the offset 1266921578. Then while  seek to 1266921576, it return the 
1266921576 exactly。 

Why ?  How to fix that ?

 

 

There is the code:

public class consumerDemo {

public static void main(String[] argv)

{

 

Properties props = new Properties();

props.put("bootstrap.servers", "172.31.29.31:9091");

props.put("group.id", "consumer-tutorial-demo");

props.put("key.deserializer", StringDeserializer.class.getName());

props.put("value.deserializer", StringDeserializer.class.getName());

KafkaConsumer consumer = new KafkaConsumer(props);

TopicPartition tp = new TopicPartition("adn-tracking-click", 15);

Collection collection = new ArrayList();

collection.add(tp);

consumer.assign(collection);

consumer.seek(tp, 1266921576);

ConsumerRecords consumerRecords = consumer.poll(1);

List> listR = consumerRecords.records(tp);

Iterator > iter = listR.iterator();

ConsumerRecord record = iter.next();

System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic()); }

}

 

 

 

    

  was:
A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  
1255644602 and  latest offset is 1271253441.  

while starting a spark streaming to process the data from the topic ,  we got a 
exception with "Got wrong record   even after seeking to offset 1266921577".

I  implemented a simple project to use consumer to  seek offset 1266921577. But 
it return the offset 1266921578. Then while  seek to 1266921576, it return the 
1266921576 exactly。 

Why ?  How to fix that ?

 

 

There is the code:

public class consumerDemo {




 public static void main(String[] argv){
 Properties props = new Properties();
 props.put("bootstrap.servers", "172.31.29.31:9091");
 props.put("group.id", "consumer-tutorial-demo");
 props.put("key.deserializer", StringDeserializer.class.getName());
 props.put("value.deserializer", StringDeserializer.class.getName());
 KafkaConsumer consumer = new KafkaConsumer(props);
 TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
 Collection collection = new ArrayList();
 collection.add(tp);
 consumer.assign(collection);
 consumer.seek(tp, 1266921576);
 ConsumerRecords consumerRecords = consumer.poll(1);

 List> listR = consumerRecords.records(tp);
 Iterator > iter = listR.iterator();
 ConsumerRecord record = iter.next();
 System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic());


 }
}

 

 

 

    


> no continuous offset for function seek
> --
>
> Key: KAFKA-6470
> URL: https://issues.apache.org/jira/browse/KAFKA-6470
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.0.1
>Reporter: chao.wu
>Priority: Major
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is  
> 1255644602 and  latest offset is 1271253441.  
> while starting a spark streaming to process the data from the topic ,  we got 
> a exception with "Got wrong record   even after seeking to offset 
> 1266921577".
> I  implemented a simple project to use consumer to  seek offset 1266921577. 
> But it return the offset 1266921578. Then while  seek to 1266921576, it 
> return the 1266921576 exactly。 
> Why ?  How to fix that ?
>  
>  
> There is the code:
> public class consumerDemo {
> public static void main(String[] argv)
> {
>  
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
> Collection collection = new ArrayList();
> collection.add(tp);
> consumer.assign(collection);
> consumer.seek(tp, 1266921576);
> ConsumerRecords consumerRecords = consumer.poll(1);
> List> listR = 

[jira] [Created] (KAFKA-6472) WordCount example code error

2018-01-23 Thread JONYhao (JIRA)
JONYhao created KAFKA-6472:
--

 Summary: WordCount example code error
 Key: KAFKA-6472
 URL: https://issues.apache.org/jira/browse/KAFKA-6472
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 0.11.0.2
Reporter: JONYhao


This is a "(" missed in the WordCount example tutorial

[https://kafka.apache.org/10/documentation/streams/tutorial]

at the end of the page ,line 31

31   {{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
Serdes.Long());}}

{{should be }}

{{31 }}{{.to(}}{{"streams-wordcount-output"}}{{, Produced.with(Serdes.String(), 
Serdes.Long()));}}{{}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6471) seekToEnd and seek give unclear results for Consumer with read_committed isolation level

2018-01-23 Thread Coen Damen (JIRA)
Coen Damen created KAFKA-6471:
-

 Summary: seekToEnd and seek give unclear results for Consumer with 
read_committed isolation level
 Key: KAFKA-6471
 URL: https://issues.apache.org/jira/browse/KAFKA-6471
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Coen Damen


I am using the transactional KafkaProducer to send messages to a topic. This 
works fine. I use a KafkaConsumer with read_committed isolation level and I 
have an issue with the seek and seekToEnd methods. According to the 
documentation, the seek and seekToEnd methods give me the LSO (Last Stable 
Offset). But this is a bit confusing. As it gives me always the same value, the 
END of the topic. No matter if the last entry is committed (by the Producer) or 
part of an aborted transaction. Example, after I abort the last 5 tries to 
insert 20_000 messages, the last 100_000 records should not be read by the 
Consumer. But during a seekToEnd it moves to the end of the Topic (including 
the 100_000 messages). But the poll() does not return them.

I am looking for a way to retrieve the Last Committed Offset (so the last 
successful committed message by the Producer). There seems to be no proper API 
method for this. So do I need to roll my own?

Option would be to move back and poll until no more records are retrieved, this 
would result in the last committed message. But I would assume that Kafka 
provides this method.

We use Kafka 1.0.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)