[jira] [Created] (KAFKA-4765) org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource and Similar Tests are Failing on some Systems (127.0.53.53 Collision Warning)

2017-02-14 Thread Armin Braun (JIRA)
Armin Braun created KAFKA-4765:
--

 Summary: 
org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource
 and Similar Tests are Failing on some Systems (127.0.53.53 Collision Warning)
 Key: KAFKA-4765
 URL: https://issues.apache.org/jira/browse/KAFKA-4765
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Affects Versions: 0.10.1.1
 Environment: All DNS environments that properly forward 127.0.53.53
Reporter: Armin Braun
Priority: Critical


The test

{code}
org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource
{code}

fails on some systems because this below snippet from 
{code}
org.apache.kafka.clients.ClientUtils#parseAndValidateAddresses
{code}

{code}
InetSocketAddress address = new InetSocketAddress(host, 
port);

if (address.isUnresolved()) {
log.warn("Removing server {} from {} as DNS resolution 
failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
} else {
addresses.add(address);
}
{code}

will add the address *some.invalid.hostname.foo.bar* to the addresses list 
without error since it is resolved to *127.0.53.53* to indicate potential 
future collision of the _.bar_ tld.
The same issue applies to a few other test cases that try to intentionally run 
into broken hostnames.

This can (and should) be fixed by using broken hostname examples that do not 
collide. I would suggest just putting a ".local" suffix on all that are 
currently affected by this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4765) org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource and Similar Tests are Failing on some Systems (127.0.53.53 Collision Warning)

2017-02-14 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15867408#comment-15867408
 ] 

Armin Braun commented on KAFKA-4765:


I could add a PR for this if you see it as a valid issue too, already tried the 
`.local` suffix fix out locally with success.

> org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource
>  and Similar Tests are Failing on some Systems (127.0.53.53 Collision Warning)
> -
>
> Key: KAFKA-4765
> URL: https://issues.apache.org/jira/browse/KAFKA-4765
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Affects Versions: 0.10.1.1
> Environment: All DNS environments that properly forward 127.0.53.53
>Reporter: Armin Braun
>Priority: Critical
>
> The test
> {code}
> org.apache.kafka.clients.producer.KafkaProducerTest#testConstructorFailureCloseResource
> {code}
> fails on some systems because this below snippet from 
> {code}
> org.apache.kafka.clients.ClientUtils#parseAndValidateAddresses
> {code}
> {code}
> InetSocketAddress address = new InetSocketAddress(host, 
> port);
> if (address.isUnresolved()) {
> log.warn("Removing server {} from {} as DNS 
> resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
> host);
> } else {
> addresses.add(address);
> }
> {code}
> will add the address *some.invalid.hostname.foo.bar* to the addresses list 
> without error since it is resolved to *127.0.53.53* to indicate potential 
> future collision of the _.bar_ tld.
> The same issue applies to a few other test cases that try to intentionally 
> run into broken hostnames.
> This can (and should) be fixed by using broken hostname examples that do not 
> collide. I would suggest just putting a ".local" suffix on all that are 
> currently affected by this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Michael Pearce
Thanks all so far for the +1 guys.

@Jay going to move these questions to the discuss thread and reply there. 
Please keep an eye out.

On 14/02/2017, 21:34, "Renu Tewari"  wrote:

+1  after the comprehensive discussion great to see this moving to a vote.



On Tue, Feb 14, 2017 at 1:07 PM, Onur Karaman 
wrote:

> +1
>
> On Tue, Feb 14, 2017 at 10:35 AM, radai 
> wrote:
>
> > +1 from me.
> >
> > also - a more usable link to the discussion thread:
> > http://markmail.org/message/x5wlkieexinovsz3
> >
> > On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
> > wrote:
> >
> > > Hi all,
> > >
> > > We would like to start the voting process for KIP-82 – Add record
> > headers.
> > > The KIP can be found
> > > at
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 82+-+Add+Record+Headers
> > >
> > > Discussion thread(s) can be found here:
> > >
> > > http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> > > Re+DISCUSS+KIP+82+Add+Record+Headers
> > > http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> > > Re+DISCUSS+KIP+82+Add+Record+Headers
> > > http://search-hadoop.com/?project=Kafka=KIP-82
> > >
> > >
> > >
> > > Thanks,
> > > Mike
> > >
> > > The information contained in this email is strictly confidential and
> for
> > > the use of the addressee only, unless otherwise indicated. If you are
> not
> > > the intended recipient, please do not read, copy, use or disclose to
> > others
> > > this message or any attachment. Please also notify the sender by
> replying
> > > to this email or by telephone (+44(020 7896 0011) and then delete the
> > email
> > > and any copies of it. Opinions, conclusion (etc) that do not relate to
> > the
> > > official business of this company shall be understood as neither given
> > nor
> > > endorsed by it. IG is a trading name of IG Markets Limited (a company
> > > registered in England and Wales, company number 04008957) and IG Index
> > > Limited (a company registered in England and Wales, company number
> > > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> > > London EC4R 2YA. Both IG Markets Limited (register number 195355) and
> IG
> > > Index Limited (register number 114059) are authorised and regulated by
> > the
> > > Financial Conduct Authority.
> > >
> >
>


The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-14 Thread Dong Lin
Hey Jun,

Thanks for all your help and time to discuss this KIP. When you get the
time, could you let me know if the previous answers address the concern?

I think the more interesting question in your last email is where we should
store the "created" flag in ZK. I proposed the solution that I like most,
i.e. store it together with the replica assignment data in the
/brokers/topics/[topic].
In order to expedite discussion, let me provide another two ideas to
address the concern just in case the first idea doesn't work:

- We can avoid extra controller ZK read when there is no disk failure (95%
of time?). When controller starts, it doesn't read controller_managed_state
in ZK and sends LeaderAndIsrRequest with "create = false". Only if
LeaderAndIsrResponse shows failure for any replica, then controller will
read controller_managed_state for this partition and re-send
LeaderAndIsrRequset with "create=true" if this replica has not been created.

- We can significantly reduce this ZK read time by making
controller_managed_state a topic level information in ZK, e.g.
/brokers/topics/[topic]/state. Given that most topic has 10+ partition, the
extra ZK read time should be less than 10% of the existing total zk read
time during controller failover.

Thanks!
Dong


On Tue, Feb 14, 2017 at 7:30 AM, Dong Lin  wrote:

> Hey Jun,
>
> I just realized that you may be suggesting that a tool for listing offline
> directories is necessary for KIP-112 by asking whether KIP-112 and KIP-113
> will be in the same release. I think such a tool is useful but doesn't have
> to be included in KIP-112. This is because as of now admin needs to log
> into broker machine and check broker log to figure out the cause of broker
> failure and the bad log directory in case of disk failure. The KIP-112
> won't make it harder since admin can still figure out the bad log directory
> by doing the same thing. Thus it is probably OK to just include this script
> in KIP-113. Regardless, my hope is to finish both KIPs ASAP and make them
> in the same release since both KIPs are needed for the JBOD setup.
>
> Thanks,
> Dong
>
> On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin  wrote:
>
>> And the test plan has also been updated to simulate disk failure by
>> changing log directory permission to 000.
>>
>> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin  wrote:
>>
>>> Hi Jun,
>>>
>>> Thanks for the reply. These comments are very helpful. Let me answer
>>> them inline.
>>>
>>>
>>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao  wrote:
>>>
 Hi, Dong,

 Thanks for the reply. A few more replies and new comments below.

 On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin  wrote:

 > Hi Jun,
 >
 > Thanks for the detailed comments. Please see answers inline:
 >
 > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao  wrote:
 >
 > > Hi, Dong,
 > >
 > > Thanks for the updated wiki. A few comments below.
 > >
 > > 1. Topics get created
 > > 1.1 Instead of storing successfully created replicas in ZK, could we
 > store
 > > unsuccessfully created replicas in ZK? Since the latter is less
 common,
 > it
 > > probably reduces the load on ZK.
 > >
 >
 > We can store unsuccessfully created replicas in ZK. But I am not sure
 if
 > that can reduce write load on ZK.
 >
 > If we want to reduce write load on ZK using by store unsuccessfully
 created
 > replicas in ZK, then broker should not write to ZK if all replicas are
 > successfully created. It means that if /broker/topics/[topic]/partiti
 > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
 given
 > partition, we have to assume all replicas of this partition have been
 > successfully created and send LeaderAndIsrRequest with create =
 false. This
 > becomes a problem if controller crashes before receiving
 > LeaderAndIsrResponse to validate whether a replica has been created.
 >
 > I think this approach and reduce the number of bytes stored in ZK.
 But I am
 > not sure if this is a concern.
 >
 >
 >
 I was mostly concerned about the controller failover time. Currently,
 the
 controller failover is likely dominated by the cost of reading
 topic/partition level information from ZK. If we add another partition
 level path in ZK, it probably will double the controller failover time.
 If
 the approach of representing the non-created replicas doesn't work, have
 you considered just adding the created flag in the leaderAndIsr path in
 ZK?


>>> Yes, I have considered adding the created flag in the leaderAndIsr path
>>> in ZK. If we were to add created flag per replica in the
>>> LeaderAndIsrRequest, then it requires a lot of change in the code base.
>>>
>>> If we don't add created flag per replica 

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-14 Thread Jun Rao
Hi, Radai,

Currently kafka-acl.sh just creates an ACL path in ZK with the principal
name string. The authorizer module in the broker reads the principal name
string from the acl path in ZK and creates the expected KafkaPrincipal for
matching. As you can see, the expected principal is created on the broker
side, not by the kafka-acl.sh tool. The broker already has the ability to
configure PrincipalBuilder. That's why I am not sure if there is a need for
kafka-acl.sh to customize PrincipalBuilder.

Thanks,

Jun


On Mon, Feb 13, 2017 at 7:01 PM, radai  wrote:

> if i understand correctly, kafka-acls.sh spins up an instance of (the
> custom, in our case) Authorizer, and calls things like addAcls(acls:
> Set[Acl], resource: Resource) on it, which are defined in the interface,
> hence expected to be "extensible".
>
> (side note: if Authorizer and PrincipalBuilder are defined as extensible
> interfaces, why doesnt class Acl, which is in the signature for Authorizer
> calls, use java.security.Principal?)
>
> we would like to be able to use the standard kafka-acl command line for
> defining ACLs even when replacing the vanilla Authorizer and
> PrincipalBuilder (even though we have a management UI for these operations
> within linkedin) - simply because thats the correct thing to do from an
> extensibility point of view.
>
> On Mon, Feb 13, 2017 at 1:39 PM, Jun Rao  wrote:
>
> > Hi, Mayuresh,
> >
> > I seems to me that there are two common use cases of authorizer. (1) Use
> > the default SimpleAuthorizer and the kafka-acl to do authorization. (2)
> Use
> > a customized authorizer and an external tool for authorization. Do you
> > think there is a use case for a customized authorizer and kafka-acl at
> the
> > same time? If not, it's better not to complicate the kafka-acl api.
> >
> > Thanks,
> >
> > Jun
> >
> >
> >
> > On Mon, Feb 13, 2017 at 10:35 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for the review and comments. Please find the replies inline :
> > >
> > > This is so that in the future, we can extend to types like group.
> > > ---> Yep, I did think the same. But since the SocketServer was always
> > > creating User type, it wasn't actually used. If we go ahead with
> changes
> > in
> > > this KIP, we will give this power of creating different Principal types
> > to
> > > the PrincipalBuilder (which users can define there own). In that way
> > Kafka
> > > will not have to deal with handling this. So the Principal building and
> > > Authorization will be opaque to Kafka which seems like an expected
> > > behavior.
> > >
> > >
> > > Hmm, normally, the configurations you specify for plug-ins refer to
> those
> > > needed to construct the plug-in object. So, it's kind of weird to use
> > that
> > > to call a method. For example, why can't principalBuilderService.rest.
> > url
> > > be passed in through the configure() method and the implementation can
> > use
> > > that to build principal. This way, there is only a single method to
> > compute
> > > the principal in a consistent way in the broker and in the kafka-acl
> > tool.
> > > > We can do that as well. But since the rest url is not related to
> > the
> > > Principal, it seems out of place to me to pass it every time we have to
> > > create a Principal. I should replace "principalConfigs" with
> > > "principalProperties".
> > > I was trying to differentiate the configs/properties that are used to
> > > create the PrincipalBuilder class and the Principal/Principals itself.
> > >
> > >
> > > For LinkedIn's use case, do you actually use the kafka-acl tool? My
> > > understanding is that LinkedIn does authorization through an external
> > tool.
> > > > For Linkedin's use case we don't actually use the kafka-acl tool
> > > right now. As per the discussion that we had on
> > > https://issues.apache.org/jira/browse/KAFKA-4454, we thought that it
> > would
> > > be good to make kafka-acl tool changes, to make it flexible and we
> might
> > be
> > > even able to use it in future.
> > >
> > > It seems it's simpler if kafka-acl doesn't to need to understand the
> > > principal builder. The tool does authorization based on a string name,
> > > which is expected to match the principal name. So, I am wondering why
> the
> > > tool needs to know the principal builder.
> > > > If we don't make this change, I am not sure how clients/end users
> > > will be able to use this tool if they have there own Authorizer that
> does
> > > Authorization based on Principal, that has more information apart from
> > name
> > > and type.
> > >
> > > What if we only make the following changes: pass the java principal in
> > > session and in
> > > SimpleAuthorizer, construct KafkaPrincipal from java principal name.
> Will
> > > that work for LinkedIn?
> > > > This can work for Linkedin but as explained above, it does not
> seem
> > > like a complete design from open source 

Re: Is it possible to receive duplicate messages from Kafka when...

2017-02-14 Thread Matthias J. Sax
Yes. That could happen.

Kafka provided at-least-once processing semantics if you commit messages
after processing.

You can avoid duplicates, if you commit offsets before processing, but
this might result in data loss.

Getting exactly-once is quite hard, and you will need to build your own
de-duplication logic.


-Matthias

On 2/10/17 10:40 AM, Michaud, Ben A wrote:
> Is it possible to receive duplicate messages from Kafka 0.9.0.1 or 0.10.1.0 
> when you have a topic with three partitions, and one consumer group with 
> three consumer clients. One client stops consuming and is taken offline. 
> These clients do not commit offset immediately, but the offsets are committed 
> automatically after a default wait time setting. The partition assigned to 
> the client that goes down is moved to another client in the same group 
> automatically.
> 
> Meanwhile, the client that went down gets some TLC, still holds some messages 
> that were retrieved but never fully processed. When it comes back up, it 
> happily completes processing the data and writes it to an HDFS.
> 
> Will the second client be given uncommitted messages that the first client 
> had already received, but never committed? This would result in duplicate 
> messages on HDFS, which is what we witnessed this week when just such a thing 
> happened.
> 
> Regards,
> Ben
> 
> 
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
> 



signature.asc
Description: OpenPGP digital signature


Re: Correct prefetching of data to KTable-like structure on application startup

2017-02-14 Thread Matthias J. Sax
Jan,

If I understand you problem correctly, you do something like this on
startup (I simplify to single partition)

endOffset = consumer.endOffset(...)

while (!done) {
  for (ConsumerRecord r : consumer.poll()) {
// do some processing
if (r.offset == endOffset) {
  done = true;
  break;
}
  }
}

If your partitions is empty, poll() never returns anything and thus you
loop forever.

However, to solve this problem, you can simple check the "start offset"
of the partitions before the loop. If start and end offset are the same,
the partitions is empty and you never call poll.

startOffset = consumer.beginningOffset(...)
endOffset = consumer.endOffset(...)

if(startOffset < endOffset) {
  while (!done) {
for (ConsumerRecord r : consumer.poll()) {
  // do some processing
  if (r.offset == endOffset) {
done = true;
break;
  }
}
  }
}


Does this help?


-Matthias

On 2/14/17 12:31 AM, Jan Lukavský wrote:
> Hi Matthias,
> 
> I understand that the local cache will not be automatically cleared, but
> that is not an issue for me now.
> 
> The problem I see is still the same as at the beginning - even caching
> data to RocksDB in KafkaStreams implementation might (I would say)
> suffer from this issue. When using time based data retention (for
> whatever reason, maybe in combination with the log compation, but the
> the issue is there irrespective to whether the log compation is used or
> not), it is possible that some partition will report nonzero "next"
> offset, but will not be able to deliver any message to the KafkaConsumer
> (because the partition is emptied by the data retention) and therefore
> the consumer will not be able to finish the materialization of the topic
> to local store (either RocksDB or any other cache) and therefore will
> not be able to start processing the KStream. If I understand the problem
> right, then using timestamp will not help either, because there must be
> some sort of vector clock with a time dimension for each input
> partition, and the empty partition will not be able to move the
> timestamp any further, and therefore the whole system will remain
> blocked at timestamp 0, because the vector clock usually calculates
> minimum from all time dimensions.
> 
> Does that make any sense, or I am doing something fundamentally wrong? :)
> 
> Thanks again for any thoughts,
> 
>  Jan
> 
> 
> On 02/13/2017 06:37 PM, Matthias J. Sax wrote:
>> Jan,
>>
>> brokers with version 0.10.1 or higher allow to set both topic cleanup
>> policies in combination:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist
>>
>>
>> However, this will only delete data in you changelog topic but not in
>> your RocksDB -- if you want to get data delete in RocksDB, you would
>> need to send tombstone messages for those keys. It's kinda tricky to get
>> this done.
>>
>> An "brute force" alternative would be, stop the application, delete the
>> local state directory, and restart. This will force Streams to recreate
>> the RocksDB files from the changelog and thus only loading keys that got
>> not deleted. But this is of course a quite expensive approach and you
>> should be very careful about using it.
>>
>>
>> -Matthias
>>
>>
>> On 2/13/17 12:25 AM, Jan Lukavský wrote:
>>> Hi Michael,
>>>
>>> sorry for my late answer. Configuring the topic as you suggest is one
>>> option (and I will configure it that way), but I wanted to combine the
>>> two data retention mechanisms (if possible). I would like to use log
>>> compaction, so that I will always get at least the last message for
>>> given key, but I would also like to use the classical temporal data
>>> retention, which would function as a sort of TTL for the keys - if a key
>>> doesn't get an update for the configured period of time, if could be
>>> removed. That way I could ensure that out-dated keys could be removed.
>>>
>>> Is there any other option for this? And can kafka be configured this
>>> way?
>>>
>>> Best,
>>>
>>>   Jan
>>>
>>> On 02/09/2017 12:08 PM, Michael Noll wrote:
 Jan,

>- if I don't send any data to a kafka partition for a period longer
> then
 the data retention interval, then all data from the partition is wiped
 out

 If I interpret your first and second message in this email thread
 correctly, then you are talking only about your "state topic" here,
 i.e.
 the topic that you read into a KTable.  You should configure this
 topic to
 use log compaction, which will ensure that the latest value for a
 given key
 will never be wiped.  So even if you don't send any data to a Kafka
 partition of this (now log-compacted) "state topic" for a long
 period of
 time, you'd always have access to (at least) the latest value for
 every key.

 Would that help?

 -Michael





 On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský 

Re: [DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-14 Thread Matthias J. Sax
Mathieu,

I personally agree with your observation, and we have plans to submit a
KIP like this. If you want to drive this discussion feel free to start
the KIP by yourself!

Having said that, for this KIP we might want to focus the discussion the
the actual feature that gets added: allowing to specify different
TS-Extractor for different inputs.



-Matthias

On 2/14/17 4:54 PM, Mathieu Fenniak wrote:
> Hi Jeyhun,
> 
> This KIP might not be the appropriate time, but my first thought reading it
> is that it might make sense to introduce a builder-style API rather than
> adding a mix of new method overloads with independent optional parameters.
> :-)
> 
> eg. stream(), table(), globalTable(), addSource(), could all accept a
> "TopicReference" parameter that can be built like:
> TopicReference("my-topic").keySerde(...).valueSerde(...).autoOffsetReset(...).timestampExtractor(...).build().
> 
> Mathieu
> 
> 
> On Tue, Feb 14, 2017 at 5:31 PM, Jeyhun Karimov 
> wrote:
> 
>> Dear community,
>>
>> I want to share the KIP-123 [1] which is based on issue KAFKA-4144 [2]. You
>> can check the PR in [3].
>>
>> I would like to get your comments.
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
>> [2] https://issues.apache.org/jira/browse/KAFKA-4144
>> [3] https://github.com/apache/kafka/pull/2466
>>
>>
>> Cheers,
>> Jeyhun
>> --
>> -Cheers
>>
>> Jeyhun
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-14 Thread Mathieu Fenniak
Hi Jeyhun,

This KIP might not be the appropriate time, but my first thought reading it
is that it might make sense to introduce a builder-style API rather than
adding a mix of new method overloads with independent optional parameters.
:-)

eg. stream(), table(), globalTable(), addSource(), could all accept a
"TopicReference" parameter that can be built like:
TopicReference("my-topic").keySerde(...).valueSerde(...).autoOffsetReset(...).timestampExtractor(...).build().

Mathieu


On Tue, Feb 14, 2017 at 5:31 PM, Jeyhun Karimov 
wrote:

> Dear community,
>
> I want to share the KIP-123 [1] which is based on issue KAFKA-4144 [2]. You
> can check the PR in [3].
>
> I would like to get your comments.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
> [2] https://issues.apache.org/jira/browse/KAFKA-4144
> [3] https://github.com/apache/kafka/pull/2466
>
>
> Cheers,
> Jeyhun
> --
> -Cheers
>
> Jeyhun
>


[DISCUSS] KIP-123: Allow per stream/table timestamp extractor

2017-02-14 Thread Jeyhun Karimov
Dear community,

I want to share the KIP-123 [1] which is based on issue KAFKA-4144 [2]. You
can check the PR in [3].

I would like to get your comments.

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788
[2] https://issues.apache.org/jira/browse/KAFKA-4144
[3] https://github.com/apache/kafka/pull/2466


Cheers,
Jeyhun
-- 
-Cheers

Jeyhun


[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2017-02-14 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-4665:
---
Status: Patch Available  (was: In Progress)

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-14 Thread Zakee
+1 

-Zakee
> On Feb 14, 2017, at 1:56 PM, Jay Kreps  wrote:
> 
> +1
> 
> Nice improvement.
> 
> -Jay
> 
> On Tue, Feb 14, 2017 at 1:22 PM, Steven Schlansker <
> sschlans...@opentable.com> wrote:
> 
>> Hi, it looks like I have 2 of the 3 minimum votes, can a third voter
>> please consider this KIP?
>> Thanks.
>> 
>> (PS - new revision on GitHub PR with hopefully the last round of
>> improvements)
>> 
>>> On Feb 8, 2017, at 9:06 PM, Matthias J. Sax 
>> wrote:
>>> 
>>> +1
>>> 
>>> On 2/8/17 4:51 PM, Gwen Shapira wrote:
 +1 (binding)
 
 On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
  wrote:
> Hi everyone,
> 
> Thank you for constructive feedback on KIP-121,
>> KStream.peek(ForeachAction) ;
> it seems like it is time to call a vote which I hope will pass easily
>> :)
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 121%3A+Add+KStream+peek+method
> 
> I believe the PR attached is already in good shape to consider merging:
> 
> https://github.com/apache/kafka/pull/2493
> 
> Thanks!
> Steven
> 
 
 
 
>>> 
>> 
>> 


Police Urge Your City Residents to Carry This at All Times
Smart Trends
http://thirdpartyoffers.netzero.net/TGL3231/58a39467a3d7f146654a1st03duc


[jira] [Commented] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866838#comment-15866838
 ] 

ASF GitHub Bot commented on KAFKA-4665:
---

GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2550

KAFKA-4665: Normalize handling of non-existing topics/partitions in 
fetching offsets

This PR brings some consistency around how non-existing topics or 
partitions are handled when fetching offsets using different versions of offset 
fetch API.
In particular, it now
* returns `UNKNOWN_TOPIC_OR_PARTITION` for non-existing topics or 
partitions in versions 1 and later (similar to how it is done in version 0)
* throws a `KafkaException` when consumers consume from a non-existing 
topic or partition, or from a topic to which they do not have Describe access.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-4665

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2550






> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2550: KAFKA-4665: Normalize handling of non-existing top...

2017-02-14 Thread vahidhashemian
GitHub user vahidhashemian opened a pull request:

https://github.com/apache/kafka/pull/2550

KAFKA-4665: Normalize handling of non-existing topics/partitions in 
fetching offsets

This PR brings some consistency around how non-existing topics or 
partitions are handled when fetching offsets using different versions of offset 
fetch API.
In particular, it now
* returns `UNKNOWN_TOPIC_OR_PARTITION` for non-existing topics or 
partitions in versions 1 and later (similar to how it is done in version 0)
* throws a `KafkaException` when consumers consume from a non-existing 
topic or partition, or from a topic to which they do not have Describe access.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vahidhashemian/kafka KAFKA-4665

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2550.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2550






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2549: Add subtree .gitignore files themselves to gitigno...

2017-02-14 Thread stevenschlansker
GitHub user stevenschlansker opened a pull request:

https://github.com/apache/kafka/pull/2549

Add subtree .gitignore files themselves to gitignore

This seems like a silly rule, but for example the Eclipse Gradle integration
will 'helpfully' generate .gitignore files in every project that then
show as dirty.  For example, if you take a vanilla Eclipse install and 
import
Kafka, this is the status you get:

```
??  clients/.gitignore
??  connect/api/.gitignore
??  connect/file/.gitignore
??  connect/json/.gitignore
??  connect/runtime/.gitignore
??  connect/transforms/.gitignore
??  examples/bin/.gitignore
??  log4j-appender/.gitignore
??  streams/.gitignore
??  streams/examples/.gitignore
??  tools/.gitignore
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stevenschlansker/kafka ignore-sub-gitignore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2549.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2549


commit 015f384d92d59553b384325107f66e32ae8189bc
Author: Steven Schlansker 
Date:   2017-02-14T22:03:56Z

Add subtree .gitignore files themselves to gitignore

This seems like a silly rule, but for example the Eclipse Gradle integration
will 'helpfully' generate .gitignore files in every project that then
show as dirty.  For example, if you take a vanilla Eclipse install and 
import
Kafka, this is the status you get:

```
??  clients/.gitignore
??  connect/api/.gitignore
??  connect/file/.gitignore
??  connect/json/.gitignore
??  connect/runtime/.gitignore
??  connect/transforms/.gitignore
??  examples/bin/.gitignore
??  log4j-appender/.gitignore
??  streams/.gitignore
??  streams/examples/.gitignore
??  tools/.gitignore
```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-14 Thread Jay Kreps
+1

Super happy with how this turned out. It's been a long journey since we
started thinking about this 3+ years ago. Can't wait to see it in
code---this is a big one! :-)

-Jay

On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang  wrote:

> Hi all,
>
> We would like to start the voting process for KIP-98. The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Discussion thread can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Thanks,
>
> --
> -- Guozhang
>


Jenkins build is back to normal : kafka-trunk-jdk8 #1274

2017-02-14 Thread Apache Jenkins Server
See 



Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-14 Thread Jay Kreps
+1

Nice improvement.

-Jay

On Tue, Feb 14, 2017 at 1:22 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> Hi, it looks like I have 2 of the 3 minimum votes, can a third voter
> please consider this KIP?
> Thanks.
>
> (PS - new revision on GitHub PR with hopefully the last round of
> improvements)
>
> > On Feb 8, 2017, at 9:06 PM, Matthias J. Sax 
> wrote:
> >
> > +1
> >
> > On 2/8/17 4:51 PM, Gwen Shapira wrote:
> >> +1 (binding)
> >>
> >> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
> >>  wrote:
> >>> Hi everyone,
> >>>
> >>> Thank you for constructive feedback on KIP-121,
> KStream.peek(ForeachAction) ;
> >>> it seems like it is time to call a vote which I hope will pass easily
> :)
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 121%3A+Add+KStream+peek+method
> >>>
> >>> I believe the PR attached is already in good shape to consider merging:
> >>>
> >>> https://github.com/apache/kafka/pull/2493
> >>>
> >>> Thanks!
> >>> Steven
> >>>
> >>
> >>
> >>
> >
>
>


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Jay Kreps
Couple of things I think we still need to work out:

   1. I think we agree about the key, but I think we haven't talked about
   the value yet. I think if our goal is an open ecosystem of these header
   spread across many plugins from many systems we should consider making this
   a string as well so it can be printed, set via a UI, set in config, etc.
   Basically encouraging pluggable serialization formats here will lead to a
   bit of a tower of babel.
   2. This proposal still includes a pretty big change to our serialization
   and protocol definition layer. Essentially it is introducing an optional
   type, where the format is data dependent. I think this is actually a big
   change though it doesn't seem like it. It means you can no longer specify
   this type with our type definition DSL, and likewise it requires custom
   handling in client libs. This isn't a huge thing, since the Record
   definition is custom anyway, but I think this kind of protocol
   inconsistency is very non-desirable and ties you to hand-coding things. I
   think the type should instead by [Key Value] in our BNF, where key and
   value are both short strings as used elsewhere. This brings it in line with
   the rest of the protocol.
   3. Could we get more specific about the exact Java API change to
   ProducerRecord, ConsumerRecord, Record, etc?

-Jay

On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
wrote:

> Hi all,
>
> We would like to start the voting process for KIP-82 – Add record headers.
> The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 82+-+Add+Record+Headers
>
> Discussion thread(s) can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/?project=Kafka=KIP-82
>
>
>
> Thanks,
> Mike
>
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Renu Tewari
+1  after the comprehensive discussion great to see this moving to a vote.



On Tue, Feb 14, 2017 at 1:07 PM, Onur Karaman 
wrote:

> +1
>
> On Tue, Feb 14, 2017 at 10:35 AM, radai 
> wrote:
>
> > +1 from me.
> >
> > also - a more usable link to the discussion thread:
> > http://markmail.org/message/x5wlkieexinovsz3
> >
> > On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
> > wrote:
> >
> > > Hi all,
> > >
> > > We would like to start the voting process for KIP-82 – Add record
> > headers.
> > > The KIP can be found
> > > at
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 82+-+Add+Record+Headers
> > >
> > > Discussion thread(s) can be found here:
> > >
> > > http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> > > Re+DISCUSS+KIP+82+Add+Record+Headers
> > > http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> > > Re+DISCUSS+KIP+82+Add+Record+Headers
> > > http://search-hadoop.com/?project=Kafka=KIP-82
> > >
> > >
> > >
> > > Thanks,
> > > Mike
> > >
> > > The information contained in this email is strictly confidential and
> for
> > > the use of the addressee only, unless otherwise indicated. If you are
> not
> > > the intended recipient, please do not read, copy, use or disclose to
> > others
> > > this message or any attachment. Please also notify the sender by
> replying
> > > to this email or by telephone (+44(020 7896 0011) and then delete the
> > email
> > > and any copies of it. Opinions, conclusion (etc) that do not relate to
> > the
> > > official business of this company shall be understood as neither given
> > nor
> > > endorsed by it. IG is a trading name of IG Markets Limited (a company
> > > registered in England and Wales, company number 04008957) and IG Index
> > > Limited (a company registered in England and Wales, company number
> > > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> > > London EC4R 2YA. Both IG Markets Limited (register number 195355) and
> IG
> > > Index Limited (register number 114059) are authorised and regulated by
> > the
> > > Financial Conduct Authority.
> > >
> >
>


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-14 Thread Steven Schlansker
Hi, it looks like I have 2 of the 3 minimum votes, can a third voter please 
consider this KIP?
Thanks.

(PS - new revision on GitHub PR with hopefully the last round of improvements)

> On Feb 8, 2017, at 9:06 PM, Matthias J. Sax  wrote:
> 
> +1
> 
> On 2/8/17 4:51 PM, Gwen Shapira wrote:
>> +1 (binding)
>> 
>> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
>>  wrote:
>>> Hi everyone,
>>> 
>>> Thank you for constructive feedback on KIP-121, 
>>> KStream.peek(ForeachAction) ;
>>> it seems like it is time to call a vote which I hope will pass easily :)
>>> 
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-121%3A+Add+KStream+peek+method
>>> 
>>> I believe the PR attached is already in good shape to consider merging:
>>> 
>>> https://github.com/apache/kafka/pull/2493
>>> 
>>> Thanks!
>>> Steven
>>> 
>> 
>> 
>> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Onur Karaman
+1

On Tue, Feb 14, 2017 at 10:35 AM, radai  wrote:

> +1 from me.
>
> also - a more usable link to the discussion thread:
> http://markmail.org/message/x5wlkieexinovsz3
>
> On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
> wrote:
>
> > Hi all,
> >
> > We would like to start the voting process for KIP-82 – Add record
> headers.
> > The KIP can be found
> > at
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 82+-+Add+Record+Headers
> >
> > Discussion thread(s) can be found here:
> >
> > http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> > Re+DISCUSS+KIP+82+Add+Record+Headers
> > http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> > Re+DISCUSS+KIP+82+Add+Record+Headers
> > http://search-hadoop.com/?project=Kafka=KIP-82
> >
> >
> >
> > Thanks,
> > Mike
> >
> > The information contained in this email is strictly confidential and for
> > the use of the addressee only, unless otherwise indicated. If you are not
> > the intended recipient, please do not read, copy, use or disclose to
> others
> > this message or any attachment. Please also notify the sender by replying
> > to this email or by telephone (+44(020 7896 0011) and then delete the
> email
> > and any copies of it. Opinions, conclusion (etc) that do not relate to
> the
> > official business of this company shall be understood as neither given
> nor
> > endorsed by it. IG is a trading name of IG Markets Limited (a company
> > registered in England and Wales, company number 04008957) and IG Index
> > Limited (a company registered in England and Wales, company number
> > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> > London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> > Index Limited (register number 114059) are authorised and regulated by
> the
> > Financial Conduct Authority.
> >
>


Build failed in Jenkins: kafka-trunk-jdk7 #1938

2017-02-14 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: don't throw CommitFailedException during suspendTasksAndState

--
[...truncated 15979 lines...]
org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[9] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[9] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[10] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[10] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[11] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[11] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[12] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[12] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[13] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[13] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[14] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[14] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[15] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[15] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[15] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[15] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[15] PASSED

org.apache.kafka.common.serialization.SerializationTest > testSerdeFromUnknown 
STARTED

org.apache.kafka.common.serialization.SerializationTest > testSerdeFromUnknown 
PASSED

org.apache.kafka.common.serialization.SerializationTest > testDoubleSerializer 
STARTED


[jira] [Comment Edited] (KAFKA-4277) creating ephemeral node already exist

2017-02-14 Thread Wrikken (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866415#comment-15866415
 ] 

Wrikken edited comment on KAFKA-4277 at 2/14/17 7:16 PM:
-

Reproduced on our side with Scala 2.11 kafka_2.11-0.10.1.1, zookeeper v. 3.4.5 
(3.4.5+dfsg-2+deb8u1 from Debian Jessie).

[2017-02-13 06:05:11,793] INFO re-registering broker info in ZK for broker 1 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-02-13 06:05:11,795] INFO Creating /brokers/ids/1 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2017-02-13 06:05:11,797] INFO Partition [,5] on broker 1: Shrinking ISR 
for partition [,5] from 3,1 to 1 (kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Partition [__consumer_offsets,31] on broker 1: 
Shrinking ISR for partition [__consumer_offsets,31] from 3,2,1 to 2,1 
(kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Result of znode creation is: NODEEXISTS 
(kafka.utils.ZKCheckedEphemeral)

The more annoying issue is I can deal with Kafka fully failing at that point, 
which would prompt a restart. However, currently it 'silently' fails, but keeps 
running, forcing us to set up extra monitoring to force a restart when this 
happens.


was (Author: wrikken):
Reproduced on our side with Scala 2.11 kafka_2.11-0.10.1.1, zookeeper v. 3.4.5 
(3.4.5+dfsg-2+deb8u1 from Debian Jessie).

[2017-02-13 06:05:11,793] INFO re-registering broker info in ZK for broker 1 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-02-13 06:05:11,795] INFO Creating /brokers/ids/1 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2017-02-13 06:05:11,797] INFO Partition [com.takeaway.events.order.create,5] 
on broker 1: Shrinking ISR for partition [com.takeaway.events.order.create,5] 
from 3,1 to 1 (kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Partition [__consumer_offsets,31] on broker 1: 
Shrinking ISR for partition [__consumer_offsets,31] from 3,2,1 to 2,1 
(kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Result of znode creation is: NODEEXISTS 
(kafka.utils.ZKCheckedEphemeral)

The more annoying issue is I can deal with Kafka fully failing at that point, 
which would prompt a restart. However, currently it 'silently' fails, but keeps 
running, forcing us to set up extra monitoring to force a restart when this 
happens.

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 

[jira] [Commented] (KAFKA-4277) creating ephemeral node already exist

2017-02-14 Thread Wrikken (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866415#comment-15866415
 ] 

Wrikken commented on KAFKA-4277:


Reproduced on our side with Scala 2.11 kafka_2.11-0.10.1.1, zookeeper v. 3.4.5 
(3.4.5+dfsg-2+deb8u1 from Debian Jessie).

[2017-02-13 06:05:11,793] INFO re-registering broker info in ZK for broker 1 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2017-02-13 06:05:11,795] INFO Creating /brokers/ids/1 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2017-02-13 06:05:11,797] INFO Partition [com.takeaway.events.order.create,5] 
on broker 1: Shrinking ISR for partition [com.takeaway.events.order.create,5] 
from 3,1 to 1 (kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Partition [__consumer_offsets,31] on broker 1: 
Shrinking ISR for partition [__consumer_offsets,31] from 3,2,1 to 2,1 
(kafka.cluster.Partition)
[2017-02-13 06:05:11,801] INFO Result of znode creation is: NODEEXISTS 
(kafka.utils.ZKCheckedEphemeral)

The more annoying issue is I can deal with Kafka fully failing at that point, 
which would prompt a restart. However, currently it 'silently' fails, but keeps 
running, forcing us to set up extra monitoring to force a restart when this 
happens.

> creating ephemeral node already exist
> -
>
> Key: KAFKA-4277
> URL: https://issues.apache.org/jira/browse/KAFKA-4277
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Feixiang Yan
>
> I use zookeeper 3.4.6.
> Zookeeper session time out, zkClient try reconnect failed. Then re-establish 
> the session and re-registering broker info in ZK, throws NODEEXISTS Exception.
>  I think it is because the ephemeral node which created by old session has 
> not removed. 
> I read the 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>  of 0.8.1, createEphemeralPathExpectConflictHandleZKBug try create node in a 
> while loop until create success. This can solve the issue. But in 
> [ZkUtils.scala|https://github.com/apache/kafka/blob/0.10.0.1/core/src/main/scala/kafka/utils/ZkUtils.scala]
>   0.10.1 the function removed.
> {noformat}
> [2016-10-07 19:00:32,562] INFO Socket connection established to 
> 10.191.155.238/10.191.155.238:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,563] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,564] INFO Unable to reconnect to ZooKeeper service, 
> session 0x1576b11f9b201bd has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,564] INFO Initiating client connection, 
> connectString=10.191.155.237:21819,10.191.155.238:21819,10.191.155.239:21819/cluster2
>  sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@ae71be2 
> (org.apache.zookeeper.ZooKeeper)
> [2016-10-07 19:00:32,566] INFO Opening socket connection to server 
> 10.191.155.237/10.191.155.237:21819. Will not attempt to authenticate using 
> SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO Socket connection established to 
> 10.191.155.237/10.191.155.237:21819, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,566] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO Session establishment complete on server 
> 10.191.155.237/10.191.155.237:21819, sessionid = 0x1579ecd39c20006, 
> negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2016-10-07 19:00:32,567] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-10-07 19:00:32,608] INFO re-registering broker info in ZK for broker 3 
> (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2016-10-07 19:00:32,610] INFO Creating /brokers/ids/3 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,611] INFO Result of znode creation is: NODEEXISTS 
> (kafka.utils.ZKCheckedEphemeral)
> [2016-10-07 19:00:32,614] ERROR Error handling event ZkEvent[New session 
> event sent to kafka.server.KafkaHealthcheck$SessionExpireListener@324f1bc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.RuntimeException: A broker is already registered on the path 
> /brokers/ids/3. This probably indicates that you either have configured a 
> brokerid that is already in use, or else you have shutdown this broker and 
> restarted it faster than the zookeeper timeout so it appears to be 
> re-registering.
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:305)
> at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:291)
> at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
> at 
> 

[GitHub] kafka pull request #2535: MINOR: don't throw CommitFailedException during su...

2017-02-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2535


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[VOTE] 0.10.2.0 RC2

2017-02-14 Thread Ewen Cheslack-Postava
Hello Kafka users, developers and client-developers,

This is the third candidate for release of Apache Kafka 0.10.2.0.

This is a minor version release of Apache Kafka. It includes 19 new KIPs.
See the release notes and release plan (https://cwiki.apache.org/conf
luence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few feature
highlights: SASL-SCRAM support, improved client compatibility to allow use
of clients newer than the broker, session windows and global tables in the
Kafka Streams API, single message transforms in the Kafka Connect framework.

Important note: in addition to the artifacts generated using JDK7 for Scala
2.10 and 2.11, this release also includes experimental artifacts built
using JDK8 for Scala 2.12.

Important code changes since RC1 (non-docs, non system tests):

KAFKA-4756; The auto-generated broker id should be passed to
MetricReporter.configure
KAFKA-4761; Fix producer regression handling small or zero batch size

Release notes for the 0.10.2.0 release:
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by February 17th 5pm ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc2/javadoc/

* Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=5712b489038b71ed8d5a679856d1dfaa925eadc1


* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

* Successful Jenkins builds for the 0.10.2 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-0.10.2-jdk7/77/
System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/29/

/**

Thanks,
Ewen


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread radai
+1 from me.

also - a more usable link to the discussion thread:
http://markmail.org/message/x5wlkieexinovsz3

On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
wrote:

> Hi all,
>
> We would like to start the voting process for KIP-82 – Add record headers.
> The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 82+-+Add+Record+Headers
>
> Discussion thread(s) can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/?project=Kafka=KIP-82
>
>
>
> Thanks,
> Mike
>
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Matthias J. Sax
You can already output any number of record within .transform() using
the provided Context object from init()...


-Matthias

On 2/14/17 9:16 AM, Guozhang Wang wrote:
>> and you can't output multiple records or branching logic from a
> transform();
> 
> For output multiple records in transform, we are currently working on
> https://issues.apache.org/jira/browse/KAFKA-4217, I think that should cover
> this use case.
> 
> For branching the output in transform, I agree this is not perfect but I
> think users can follow some patterns like "stream.transform().branch()",
> would that work for you?
> 
> 
> Guozhang
> 
> 
> On Tue, Feb 14, 2017 at 8:29 AM, Mathieu Fenniak <
> mathieu.fenn...@replicon.com> wrote:
> 
>> On Tue, Feb 14, 2017 at 1:14 AM, Guozhang Wang  wrote:
>>
>>> Some thoughts on the mixture usage of DSL / PAPI:
>>>
>>> There were some suggestions on mixing the usage of DSL and PAPI:
>>> https://issues.apache.org/jira/browse/KAFKA-3455, and after thinking it
>> a
>>> bit more carefully, I'd rather not recommend users following this
>> pattern,
>>> since in DSL this can always be achieved in process() / transform().
>> Hence
>>> I think it is okay to prevent such patterns in the new APIs. And for the
>>> same reasons, I think we can remove KStreamBuilder#newName() from the
>>> public APIs.
>>>
>>
>> I'm not sure that things can always be achieved by process() /
>> transform()... there are some limitations to these APIs.  You can't output
>> from a process(), and you can't output multiple records or branching logic
>> from a transform(); these are things that can be done in the PAPI quite
>> easily.
>>
>> I definitely understand a preference for using process()/transform() where
>> possible, but, they don't seem to replace the PAPI.
>>
>> I would love to be operating in a world that was entirely DSL.  But the DSL
>> is limited, and it isn't extensible (... by any stable API).  I don't mind
>> reaching into internals today and making my own life difficult to extend
>> it, and I'd continue to find a way to do that if you made the APIs distinct
>> and split, but I'm just expressing my preference that you not do that. :-)
>>
>> And about printing the topology for debuggability: I agrees this is a
>>> potential drawback, and I'd suggest maintain some functionality to build
>> a
>>> "dry topology" as Mathieu suggested; the difficulty is that, internally
>> we
>>> need a different "copy" of the topology for each thread so that they will
>>> not share any states, so we cannot directly pass in the topology into
>>> KafkaStreams instead of the topology builder. So how about adding a
>>> `ToplogyBuilder#toString` function which calls `build()` internally then
>>> prints the built dry topology?
>>>
>>
>> Well, this sounds better than KafkaStreams#toString() in that it doesn't
>> require a running processor.  But I'd really love to have a simple object
>> model for the topology, not a string output, so that I can output my own
>> debug format.  I currently have that in the form of
>> TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).
>>
>> Mathieu
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Updated] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-14 Thread Vipul Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vipul Singh updated KAFKA-4762:
---
Affects Version/s: 0.9.0.1

> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Michael Pearce
Hi all,

We would like to start the voting process for KIP-82 – Add record headers. The 
KIP can be found
at

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

Discussion thread(s) can be found here:

http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=Re+DISCUSS+KIP+82+Add+Record+Headers
http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=Re+DISCUSS+KIP+82+Add+Record+Headers
http://search-hadoop.com/?project=Kafka=KIP-82



Thanks,
Mike

The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


[jira] [Commented] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level

2017-02-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866212#comment-15866212
 ] 

Jason Gustafson commented on KAFKA-4159:


[~sjdurfey] I've added you as a contributor. You should be able to assign this 
and other Kafka jiras.

> Allow overriding producer & consumer properties at the connector level
> --
>
> Key: KAFKA-4159
> URL: https://issues.apache.org/jira/browse/KAFKA-4159
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>
> As an example use cases, overriding a sink connector's consumer's partition 
> assignment strategy.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Request for JIRA access

2017-02-14 Thread Thomas Dutta
Dear Team,

I am a Computer Science graduate student at the University of Illinois at
Chicago. I am a newbie and I want to contribute to Apache Kafka project.
Could you please add me to the list so that I can assign JIRA ticket to
myself.

Please let me know if you need additional information.

Regards,
Thomas Dutta


kafka consumer coordinator is unknown issue

2017-02-14 Thread Ashwini Mhatre (asmhatre)
Hi,
I have 3 node kafka cluster.
I have one  kafka consumer which will listen to particular topic.
Facing following issue:

08:52:25.733 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency

08:52:25.733 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency

08:52:25.734 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency

08:52:25.739 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency

08:52:25.746 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched

08:52:25.747 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched

08:52:25.747 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency

08:52:25.747 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag

08:52:25.747 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time

08:52:25.748 [SparkApp-akka.actor.default-dispatcher-2] WARN  
o.a.k.c.consumer.ConsumerConfig - The configuration autooffset.reset = earliest 
was supplied but isn't a known config.

08:52:25.748 [SparkApp-akka.actor.default-dispatcher-2] WARN  
o.a.k.c.consumer.ConsumerConfig - The configuration auto.commit = true was 
supplied but isn't a known config.

08:52:25.749 [SparkApp-akka.actor.default-dispatcher-2] INFO  
o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1

08:52:25.749 [SparkApp-akka.actor.default-dispatcher-2] INFO  
o.a.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5

08:52:25.750 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.k.clients.consumer.KafkaConsumer - Kafka consumer created

08:52:25.750 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.k.clients.consumer.KafkaConsumer - Subscribed to partition(s): 
orchestrator-0

08:52:25.751 [SparkApp-akka.actor.default-dispatcher-2] DEBUG 
o.a.k.clients.consumer.KafkaConsumer - Seeking to offset 0 for partition 
orchestrator-0

08:52:26.756 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.AbstractCoordinator - Sending coordinator request for group 
NewCustomers to broker datapipeline-us-west-2-kafka-02.sse.com:9092 (id: -2 
rack: null)

08:52:26.773 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node -2 at 
datapipeline-us-west-2-kafka-02.sse.com:9092.

08:52:26.827 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.apache.kafka.clients.NetworkClient - Initialize connection to node -3 for 
sending metadata request

08:52:26.827 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node -3 at 
datapipeline-us-west-2-kafka-03.sse.com:9092.

08:52:26.929 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for 
sending metadata request

08:52:26.929 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 
datapipeline-us-west-2-kafka-01.sse.com:9092.

08:52:30.739 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:30.838 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:30.938 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.039 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.139 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.239 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.340 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.439 [SparkApp-akka.actor.default-dispatcher-9] DEBUG 
o.a.k.c.c.i.ConsumerCoordinator - Cannot auto-commit offsets for group 
NewCustomers since the coordinator is unknown

08:52:31.539 

Re: Request Wiki edit permission for mfenniak

2017-02-14 Thread Guozhang Wang
Done, you can now also assign JIRAs to yourself. Cheers.

Guozhang

On Tue, Feb 14, 2017 at 8:51 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> Hello,
>
> I'd like to contribute a KIP, and so I am requesting access to edit the
> Kafka Wiki for the user "mfenniak".
>
> Thanks,
>
> Mathieu
>



-- 
-- Guozhang


[jira] [Commented] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level

2017-02-14 Thread Stephen Durfey (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866127#comment-15866127
 ] 

Stephen Durfey commented on KAFKA-4159:
---

I had a use case for this feature. I wanted to be able to group an arbitrary 
number of connectors under the same consumer group name, without requiring that 
all connectors have the same group name (which would happen overriding group.id 
at the worker config). So, this change effectively does the same thing 
overriding at the worker level does, except allowing it at the connector level. 

I'm not able to assign to this myself. 

https://github.com/apache/kafka/pull/2548

> Allow overriding producer & consumer properties at the connector level
> --
>
> Key: KAFKA-4159
> URL: https://issues.apache.org/jira/browse/KAFKA-4159
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>
> As an example use cases, overriding a sink connector's consumer's partition 
> assignment strategy.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Request Wiki edit permission for mfenniak

2017-02-14 Thread Mathieu Fenniak
Hello,

I'd like to contribute a KIP, and so I am requesting access to edit the
Kafka Wiki for the user "mfenniak".

Thanks,

Mathieu


get function

2017-02-14 Thread Samy CHBINOU
In kafka there is a Subscribe API to be notified of incoming message 
values from a topic. I didn't see a Get(key, topic) function. I mean a 
Get function to retreive a value from a key inside a topic. Isn't this 
feature implemented? (Like in Redis). If it doesn't exists, do you think 
is is easy to implement?

Best Regard


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Mathieu Fenniak
On Tue, Feb 14, 2017 at 9:37 AM, Damian Guy  wrote:

> > And about printing the topology for debuggability: I agrees this is a
> > > potential drawback, and I'd suggest maintain some functionality to
> build
> > a
> > > "dry topology" as Mathieu suggested; the difficulty is that, internally
> > we
> > > need a different "copy" of the topology for each thread so that they
> will
> > > not share any states, so we cannot directly pass in the topology into
> > > KafkaStreams instead of the topology builder. So how about adding a
> > > `ToplogyBuilder#toString` function which calls `build()` internally
> then
> > > prints the built dry topology?
> > >
> >
> > Well, this sounds better than KafkaStreams#toString() in that it doesn't
> > require a running processor.  But I'd really love to have a simple object
> > model for the topology, not a string output, so that I can output my own
> > debug format.  I currently have that in the form of
> > TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).
> >
>
> How about a method on TopologyBuilder that you pass a functional interface
> to and it gets called once for each ProcessorTopology? We may need to add a
> new public class that represents the topology (i'm not sure we want to
> 'expose` ProcessorTopology as public).
>

That sounds awesome.

Mathieu


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Damian Guy
> And about printing the topology for debuggability: I agrees this is a
> > potential drawback, and I'd suggest maintain some functionality to build
> a
> > "dry topology" as Mathieu suggested; the difficulty is that, internally
> we
> > need a different "copy" of the topology for each thread so that they will
> > not share any states, so we cannot directly pass in the topology into
> > KafkaStreams instead of the topology builder. So how about adding a
> > `ToplogyBuilder#toString` function which calls `build()` internally then
> > prints the built dry topology?
> >
>
> Well, this sounds better than KafkaStreams#toString() in that it doesn't
> require a running processor.  But I'd really love to have a simple object
> model for the topology, not a string output, so that I can output my own
> debug format.  I currently have that in the form of
> TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).
>

How about a method on TopologyBuilder that you pass a functional interface
to and it gets called once for each ProcessorTopology? We may need to add a
new public class that represents the topology (i'm not sure we want to
'expose` ProcessorTopology as public).


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Mathieu Fenniak
On Tue, Feb 14, 2017 at 1:14 AM, Guozhang Wang  wrote:

> Some thoughts on the mixture usage of DSL / PAPI:
>
> There were some suggestions on mixing the usage of DSL and PAPI:
> https://issues.apache.org/jira/browse/KAFKA-3455, and after thinking it a
> bit more carefully, I'd rather not recommend users following this pattern,
> since in DSL this can always be achieved in process() / transform(). Hence
> I think it is okay to prevent such patterns in the new APIs. And for the
> same reasons, I think we can remove KStreamBuilder#newName() from the
> public APIs.
>

I'm not sure that things can always be achieved by process() /
transform()... there are some limitations to these APIs.  You can't output
from a process(), and you can't output multiple records or branching logic
from a transform(); these are things that can be done in the PAPI quite
easily.

I definitely understand a preference for using process()/transform() where
possible, but, they don't seem to replace the PAPI.

I would love to be operating in a world that was entirely DSL.  But the DSL
is limited, and it isn't extensible (... by any stable API).  I don't mind
reaching into internals today and making my own life difficult to extend
it, and I'd continue to find a way to do that if you made the APIs distinct
and split, but I'm just expressing my preference that you not do that. :-)

And about printing the topology for debuggability: I agrees this is a
> potential drawback, and I'd suggest maintain some functionality to build a
> "dry topology" as Mathieu suggested; the difficulty is that, internally we
> need a different "copy" of the topology for each thread so that they will
> not share any states, so we cannot directly pass in the topology into
> KafkaStreams instead of the topology builder. So how about adding a
> `ToplogyBuilder#toString` function which calls `build()` internally then
> prints the built dry topology?
>

Well, this sounds better than KafkaStreams#toString() in that it doesn't
require a running processor.  But I'd really love to have a simple object
model for the topology, not a string output, so that I can output my own
debug format.  I currently have that in the form of
TopologyBuilder#nodeGroups() & TopologyBuilder#build(Integer).

Mathieu


[GitHub] kafka pull request #2547: MINOR: add session windows doc to streams.html

2017-02-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2547

MINOR: add session windows doc to streams.html



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka session-window-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2547


commit dae48428990fffb025f802ce1b68caa197dedfa8
Author: Damian Guy 
Date:   2017-02-14T16:19:43Z

session windows doc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-14 Thread Dong Lin
Hey Jun,

I just realized that you may be suggesting that a tool for listing offline
directories is necessary for KIP-112 by asking whether KIP-112 and KIP-113
will be in the same release. I think such a tool is useful but doesn't have
to be included in KIP-112. This is because as of now admin needs to log
into broker machine and check broker log to figure out the cause of broker
failure and the bad log directory in case of disk failure. The KIP-112
won't make it harder since admin can still figure out the bad log directory
by doing the same thing. Thus it is probably OK to just include this script
in KIP-113. Regardless, my hope is to finish both KIPs ASAP and make them
in the same release since both KIPs are needed for the JBOD setup.

Thanks,
Dong

On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin  wrote:

> And the test plan has also been updated to simulate disk failure by
> changing log directory permission to 000.
>
> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin  wrote:
>
>> Hi Jun,
>>
>> Thanks for the reply. These comments are very helpful. Let me answer them
>> inline.
>>
>>
>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao  wrote:
>>
>>> Hi, Dong,
>>>
>>> Thanks for the reply. A few more replies and new comments below.
>>>
>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin  wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Thanks for the detailed comments. Please see answers inline:
>>> >
>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao  wrote:
>>> >
>>> > > Hi, Dong,
>>> > >
>>> > > Thanks for the updated wiki. A few comments below.
>>> > >
>>> > > 1. Topics get created
>>> > > 1.1 Instead of storing successfully created replicas in ZK, could we
>>> > store
>>> > > unsuccessfully created replicas in ZK? Since the latter is less
>>> common,
>>> > it
>>> > > probably reduces the load on ZK.
>>> > >
>>> >
>>> > We can store unsuccessfully created replicas in ZK. But I am not sure
>>> if
>>> > that can reduce write load on ZK.
>>> >
>>> > If we want to reduce write load on ZK using by store unsuccessfully
>>> created
>>> > replicas in ZK, then broker should not write to ZK if all replicas are
>>> > successfully created. It means that if /broker/topics/[topic]/partiti
>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
>>> given
>>> > partition, we have to assume all replicas of this partition have been
>>> > successfully created and send LeaderAndIsrRequest with create = false.
>>> This
>>> > becomes a problem if controller crashes before receiving
>>> > LeaderAndIsrResponse to validate whether a replica has been created.
>>> >
>>> > I think this approach and reduce the number of bytes stored in ZK. But
>>> I am
>>> > not sure if this is a concern.
>>> >
>>> >
>>> >
>>> I was mostly concerned about the controller failover time. Currently, the
>>> controller failover is likely dominated by the cost of reading
>>> topic/partition level information from ZK. If we add another partition
>>> level path in ZK, it probably will double the controller failover time.
>>> If
>>> the approach of representing the non-created replicas doesn't work, have
>>> you considered just adding the created flag in the leaderAndIsr path in
>>> ZK?
>>>
>>>
>> Yes, I have considered adding the created flag in the leaderAndIsr path
>> in ZK. If we were to add created flag per replica in the
>> LeaderAndIsrRequest, then it requires a lot of change in the code base.
>>
>> If we don't add created flag per replica in the LeaderAndIsrRequest, then
>> the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
>> different. Further, the procedure for broker to update ISR in ZK will be a
>> bit complicated. When leader updates leaderAndIsr path in ZK, it will have
>> to first read created flags from ZK, change isr, and write leaderAndIsr
>> back to ZK. And it needs to check znode version and re-try write operation
>> in ZK if controller has updated ZK during this period. This is in contrast
>> to the current implementation where the leader either gets all the
>> information from LeaderAndIsrRequest sent by controller, or determine the
>> infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.
>>
>> It seems to me that the above solution is a bit complicated and not
>> clean. Thus I come up with the design in this KIP to store this created
>> flag in a separate zk path. The path is named controller_managed_state to
>> indicate that we can store in this znode all information that is managed by
>> controller only, as opposed to ISR.
>>
>> I agree with your concern of increased ZK read time during controller
>> failover. How about we store the "created" information in the
>> znode /brokers/topics/[topic]? We can change that znode to have the
>> following data format:
>>
>> {
>>   "version" : 2,
>>   "created" : {
>> "1" : [1, 2, 3],
>> ...
>>   }
>>   "partition" : {
>> "1" : [1, 2, 3],
>> ...
>>   }

[GitHub] kafka pull request #2546: KAFKA-4764: Improve diagnostics for SASL auth fail...

2017-02-14 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2546

KAFKA-4764: Improve diagnostics for SASL auth failures

First step towards improving handling of client SASL authentication 
failures.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4764

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2546


commit 643384757ec3364f2f3f5efcd3520a9d1e6f91de
Author: Rajini Sivaram 
Date:   2017-02-14T13:48:22Z

KAFKA-4764: Improve diagnostics for SASL auth failures




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4764) Improve diagnostics for SASL authentication failures

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865903#comment-15865903
 ] 

ASF GitHub Bot commented on KAFKA-4764:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/2546

KAFKA-4764: Improve diagnostics for SASL auth failures

First step towards improving handling of client SASL authentication 
failures.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-4764

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2546


commit 643384757ec3364f2f3f5efcd3520a9d1e6f91de
Author: Rajini Sivaram 
Date:   2017-02-14T13:48:22Z

KAFKA-4764: Improve diagnostics for SASL auth failures




> Improve diagnostics for SASL authentication failures
> 
>
> Key: KAFKA-4764
> URL: https://issues.apache.org/jira/browse/KAFKA-4764
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Affects Versions: 0.10.2.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
> Fix For: 0.10.3.0
>
>
> At the moment, broker closes the client connection if SASL authentication 
> fails. Clients see this as a connection failure and do not get any feedback 
> for the reason why the connection was closed. Producers and consumers retry, 
> attempting to create successful connections, treating authentication failures 
> as transient failures. There are no log entries on the client-side which 
> indicate that any of these connection failures were due to authentication 
> failure.
> This JIRA will aim to improve diagnosis of authentication failures with the 
> following changes:
> - Broker will send an authentication error code if SASL authentication fails, 
> just before closing the connection. This will be treated as an invalid token 
> by the client authenticator, and the error handling for invalid tokens will 
> be updated to report authentication failure for this case. This is a bit of a 
> hack, but would work with GSSAPI, PLAIN and SCRAM. SASL itself doesn't 
> provide a mechanism-independent way of reporting authentication failures. An 
> alternative would be to wrap SASL authentication in Kafka request/response to 
> enables error codes to be sent as Kafka response, but that would be a much 
> bigger change.
> - Log a warning in clients for authentication failures, distinguishing these 
> from EOF exceptions due to connection failure
> - Blackout nodes to which connection failed due to authentication error, no 
> more attempts will be made to connect to these nodes.
> - We should use the connection state to improve handling of producer/consumer 
> requests, avoiding unnecessary blocking. This will not be addressed in this 
> JIRA, KAFKA-3899 should be able to use the additional state from JIRA to fix 
> this issue.
> This JIRA also does not change handling of SSL authentication failures. 
> javax.net.debug provides sufficient diagnostics for this case, I don't 
> believe there is sufficient information in `SslTransportLayer` to treat these 
> in a consistent way with SASL authentication failures.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4764) Improve diagnostics for SASL authentication failures

2017-02-14 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-4764:
-

 Summary: Improve diagnostics for SASL authentication failures
 Key: KAFKA-4764
 URL: https://issues.apache.org/jira/browse/KAFKA-4764
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 0.10.2.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 0.10.3.0


At the moment, broker closes the client connection if SASL authentication 
fails. Clients see this as a connection failure and do not get any feedback for 
the reason why the connection was closed. Producers and consumers retry, 
attempting to create successful connections, treating authentication failures 
as transient failures. There are no log entries on the client-side which 
indicate that any of these connection failures were due to authentication 
failure.

This JIRA will aim to improve diagnosis of authentication failures with the 
following changes:
- Broker will send an authentication error code if SASL authentication fails, 
just before closing the connection. This will be treated as an invalid token by 
the client authenticator, and the error handling for invalid tokens will be 
updated to report authentication failure for this case. This is a bit of a 
hack, but would work with GSSAPI, PLAIN and SCRAM. SASL itself doesn't provide 
a mechanism-independent way of reporting authentication failures. An 
alternative would be to wrap SASL authentication in Kafka request/response to 
enables error codes to be sent as Kafka response, but that would be a much 
bigger change.

- Log a warning in clients for authentication failures, distinguishing these 
from EOF exceptions due to connection failure

- Blackout nodes to which connection failed due to authentication error, no 
more attempts will be made to connect to these nodes.

- We should use the connection state to improve handling of producer/consumer 
requests, avoiding unnecessary blocking. This will not be addressed in this 
JIRA, KAFKA-3899 should be able to use the additional state from JIRA to fix 
this issue.

This JIRA also does not change handling of SSL authentication failures. 
javax.net.debug provides sufficient diagnostics for this case, I don't believe 
there is sufficient information in `SslTransportLayer` to treat these in a 
consistent way with SASL authentication failures.







--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration

2017-02-14 Thread Eno Thereska
Even if users commit on every record, the expensive part will not be the 
checkpointing proposed in this KIP, but the rest of the commit.

Eno


> On 13 Feb 2017, at 23:46, Guozhang Wang  wrote:
> 
> I think I'm OK to always enable checkpointing, but I'm not sure if we want
> to checkpoint on every commit. Since in the extreme case users can commit
> on completed processing each record. So I think it is still valuable to
> have a checkpoint internal config in this KIP, which can be ignored if EOS
> is turned on. That being said, if most people are favoring checkpointing on
> each commit we can try that with this as well, since it won't change any
> public APIs and we can still add this config in the future if we do observe
> some users reporting it has huge perf impacts.
> 
> 
> 
> Guozhang
> 
> On Fri, Feb 10, 2017 at 12:20 PM, Damian Guy  wrote:
> 
>> I'm fine with that. Gouzhang?
>> On Fri, 10 Feb 2017 at 19:45, Matthias J. Sax 
>> wrote:
>> 
>>> I am actually supporting Eno's view: checkpoint on every commit.
>>> 
>>> @Dhwani: I understand your view and did raise the same question about
>>> performance trade-off with checkpoiting enabled/disabled etc. However,
>>> it seems that writing the checkpoint file is super cheap -- thus, there
>>> is nothing to gain performance wise by disabling it.
>>> 
>>> For Streams EoS we do not need the checkpoint file -- but we should have
>>> a switch for EoS anyway and can disable the checkpoint file for this
>>> case. And even if there is no switch and we enable EoS all the time, we
>>> can get rid of the checkpoint file overall (making the parameter
>> obsolete).
>>> 
>>> IMHO, if the config parameter is not really useful, we should not have
>> it.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 2/10/17 9:27 AM, Damian Guy wrote:
 Gouzhang, Thanks for the clarification. Understood.
 
 Eno, you are correct if we just used commit interval then we wouldn't
>>> need
 a KIP. But, then we'd have no way of turning it off.
 
 On Fri, 10 Feb 2017 at 17:14 Eno Thereska 
>>> wrote:
 
> A quick check: the checkpoint file is not new, we're just exposing a
>>> knob
> on when to set it, right? Would turning if off still do what it does
>>> today
> (i.e., write the checkpoint at the end when the user quits?) So it's
>>> not a
> new feature as such, I was only recommending we dial up the frequency
>> by
> default. With that option arguably we don't even need a KIP.
> 
> Eno
> 
> 
> 
>> On 10 Feb 2017, at 17:02, Guozhang Wang  wrote:
>> 
>> Damian,
>> 
>> I was thinking if it is a new failure scenarios but as Eno pointed
>> out
>>> it
>> was not.
>> 
>> Another thing I was considering is if it has any impact for
>>> incorporating
>> KIP-98 to avoid duplicates: if there is a failure in the middle of a
>> transaction, then upon recovery we cannot rely on the local state
>> store
>> file even if the checkpoint file exists, since the local state store
>>> file
>> may not be at the transaction boundaries. But since Streams will
>> likely
> to
>> have EOS as an opt-in I think it is still worthwhile to add this
>>> feature,
>> just keeping in mind that when EOS is turned on it may cease to be
>> effective.
>> 
>> And yes, I'd suggest we leave the config value to be possibly
> non-positive
>> to indicate not turning on this feature for the reason above: if it
>>> will
>> not be effective then we want to leave it as an option to be turned
>>> off.
>> 
>> Guozhang
>> 
>> 
>> On Fri, Feb 10, 2017 at 8:06 AM, Eno Thereska <
>> eno.there...@gmail.com>
>> wrote:
>> 
>>> The overhead of writing to the checkpoint file should be much, much
>>> smaller than the overall overhead of doing a commit, so I think
>> tuning
> the
>>> commit time is sufficient to guide performance tradeoffs.
>>> 
>>> Eno
>>> 
 On 10 Feb 2017, at 13:08, Dhwani Katagade <
> dhwani_katag...@persistent.co
>>> .in> wrote:
 
 May be for fine tuning the performance.
 Say we don't need the checkpointing and would like to gain the lil
>>> bit
>>> of performance improvement by turning it off.
 The trade off is between giving people control knobs vs
>> complicating
> the
>>> complete set of knobs.
 
 -dk
 
 On Friday 10 February 2017 04:05 PM, Eno Thereska wrote:
> I can't see why users would care to turn it off.
> 
> Eno
>> On 10 Feb 2017, at 10:29, Damian Guy 
>> wrote:
>> 
>> Hi Eno,
>> 
>> Sounds good to me. The only reason i can think of is if we want
>> to
>>> be
>>> able
>> to turn it off.
>> Gouzhang - thoughts?
>> 

Re: [VOTE] 0.10.2.0 RC1

2017-02-14 Thread Ian Duffy
Hi Matthias and Guozhang,

+1 (non-binding)

We eventually solved this on our end. It boiled down to dodgy data and
unexpected hidden errors in our processing steps. This eventually resulted
in a session time out which triggered the rebalances.

Thanks again for all your help.
Ian.


On 14 February 2017 at 08:06, Moczarski, Swen <
smoczar...@ebay-kleinanzeigen.de> wrote:

> Thanks a lot for the fast fix!
>
> I tested my code with the fix and it works fine without exception.
>
> Regards,
> Swen
>
> Am 2/13/17, 8:38 PM schrieb "Guozhang Wang" :
>
> Thanks for reporting the JIRA Swen.
>
> Jason has a patch ready under KAFKA-4761 and I have reviewed it. You
> could
> try it out and see if it has fixed your issue.
>
> After this is merged in, we will need another RC.
>
>
> Guozhang
>
> On Mon, Feb 13, 2017 at 9:52 AM, Moczarski, Swen <
> smoczar...@ebay-kleinanzeigen.de> wrote:
>
> > +0 (non-binding)
> >
> > Thanks for compiling a new release candidate.
> >
> > I get an NullPointerException when setting batch.size=0 on producer
> > config. This worked before with 0.10.1.1.
> > See https://issues.apache.org/jira/browse/KAFKA-4761
> >
> > Regards,
> > Swen
> >
> > Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" <
> e...@confluent.io>:
> >
> > Hello Kafka users, developers and client-developers,
> >
> > This is RC1 for release of Apache Kafka 0.10.2.0.
> >
> > This is a minor version release of Apache Kafka. It includes 19
> new
> > KIPs.
> > See the release notes and release plan (
> https://cwiki.apache.org/
> > confluence/display/KAFKA/Release+Plan+0.10.2.0) for more
> details. A
> > few
> > feature highlights: SASL-SCRAM support, improved client
> compatibility
> > to
> > allow use of clients newer than the broker, session windows and
> global
> > tables in the Kafka Streams API, single message transforms in
> the Kafka
> > Connect framework.
> >
> > Important note: in addition to the artifacts generated using
> JDK7 for
> > Scala
> > 2.10 and 2.11, this release also includes experimental artifacts
> built
> > using JDK8 for Scala 2.12.
> >
> > Important code changes since RC0 (non-docs, non system tests):
> >
> > * KAFKA-4728; KafkaConsumer#commitSync should copy its input
> > * KAFKA-4441; Monitoring incorrect during topic creation and
> deletion
> > * KAFKA-4734; Trim the time index on old segments
> > * KAFKA-4725; Stop leaking messages in produce request body when
> > requests
> > are delayed
> > * KAFKA-4716: Fix case when controller cannot be reached
> >
> > Release notes for the 0.10.2.0 release:
> > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
> RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> > e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
> >
> >
> > * Documentation:
> > http://kafka.apache.org/0102/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/0102/protocol.html
> >
> > * Successful Jenkins builds for the 0.10.2 branch:
> > Unit/integration tests: https://builds.apache.org/job/
> > kafka-0.10.2-jdk7/74/
> > System tests: https://jenkins.confluent.io/
> > job/system-test-kafka-0.10.2/25/
> >
> > /**
> >
> > Thanks,
> > Ewen
> >
> >
> >
>
>
> --
> -- Guozhang
>
>
>


[jira] [Comment Edited] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-02-14 Thread Prasanna Gautam (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865486#comment-15865486
 ] 

Prasanna Gautam edited comment on KAFKA-2729 at 2/14/17 9:49 AM:
-

This is still replicable in Kafka 0.10.1.1 when Kafka brokers are partitioned 
from each other and zookeeper gets disconnected from the brokers briefly and 
comes back. This situation leads to brokers getting stuck in comparing Cached 
zkVersion and unable to expand the ISR. 

The code in Partition.scala does not seem to be handling enough error 
conditions other than the stale zkVersion. In addition to skipping in the 
current loop, I think it should reconnect to zookeeper to update the current 
state and version. 

Here's a suggestion to do this.. doing it asynchronously doesn't break the 
existing flow and you can update the state. ZkVersion may not be the only thing 
to update here.

{code}
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r 
=> r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
  newLeaderAndIsr, controllerEpoch, zkVersion)

if(updateSucceeded) {
  replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
  inSyncReplicas = newIsr
  zkVersion = newVersion
  trace("ISR updated to [%s] and zkVersion updated to 
[%d]".format(newIsr.mkString(","), zkVersion))
} else {
  info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating 
ISR".format(zkVersion))
  zkVersion = asyncUpdateTopicPartitionVersion(topic,partitionId)
}
{code}


was (Author: prasincs):
This is still replicable in Kafka 0.10.1.1 when Kafka brokers are partitioned 
from each other and zookeeper gets disconnected from the brokers briefly and 
comes back. This situation leads to brokers getting stuck in comparing Cached 
zkVersion and unable to expand the ISR. 

The code in Partition.scala does not seem to be handling enough error 
conditions other than the stale zkVersion. In addition to skipping in the 
current loop, I think it should reconnect to zookeeper to update the current 
state and version. 

Here's a suggestion to do this.. doing it asynchronously doesn't break the flow 
and you can update the state. ZkVersion may not be the only thing to update 
here.

{code}
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r 
=> r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
  newLeaderAndIsr, controllerEpoch, zkVersion)

if(updateSucceeded) {
  replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
  inSyncReplicas = newIsr
  zkVersion = newVersion
  trace("ISR updated to [%s] and zkVersion updated to 
[%d]".format(newIsr.mkString(","), zkVersion))
} else {
  info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating 
ISR".format(zkVersion))
  zkVersion = asyncUpdateTopicPartitionVersion(topic,partitionId)
}
{code}

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-2729) Cached zkVersion not equal to that in zookeeper, broker not recovering.

2017-02-14 Thread Prasanna Gautam (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865486#comment-15865486
 ] 

Prasanna Gautam commented on KAFKA-2729:


This is still replicable in Kafka 0.10.1.1 when Kafka brokers are partitioned 
from each other and zookeeper gets disconnected from the brokers briefly and 
comes back. This situation leads to brokers getting stuck in comparing Cached 
zkVersion and unable to expand the ISR. 

The code in Partition.scala does not seem to be handling enough error 
conditions other than the stale zkVersion. In addition to skipping in the 
current loop, I think it should reconnect to zookeeper to update the current 
state and version. 

Here's a suggestion to do this.. doing it asynchronously doesn't break the flow 
and you can update the state. ZkVersion may not be the only thing to update 
here.

{code}
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r 
=> r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = 
ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
  newLeaderAndIsr, controllerEpoch, zkVersion)

if(updateSucceeded) {
  replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
  inSyncReplicas = newIsr
  zkVersion = newVersion
  trace("ISR updated to [%s] and zkVersion updated to 
[%d]".format(newIsr.mkString(","), zkVersion))
} else {
  info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating 
ISR".format(zkVersion))
  zkVersion = asyncUpdateTopicPartitionVersion(topic,partitionId)
}
{code}

> Cached zkVersion not equal to that in zookeeper, broker not recovering.
> ---
>
> Key: KAFKA-2729
> URL: https://issues.apache.org/jira/browse/KAFKA-2729
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
>Reporter: Danil Serdyuchenko
>
> After a small network wobble where zookeeper nodes couldn't reach each other, 
> we started seeing a large number of undereplicated partitions. The zookeeper 
> cluster recovered, however we continued to see a large number of 
> undereplicated partitions. Two brokers in the kafka cluster were showing this 
> in the logs:
> {code}
> [2015-10-27 11:36:00,888] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Shrinking ISR for 
> partition [__samza_checkpoint_event-creation_1,3] from 6,5 to 5 
> (kafka.cluster.Partition)
> [2015-10-27 11:36:00,891] INFO Partition 
> [__samza_checkpoint_event-creation_1,3] on broker 5: Cached zkVersion [66] 
> not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition)
> {code}
> For all of the topics on the effected brokers. Both brokers only recovered 
> after a restart. Our own investigation yielded nothing, I was hoping you 
> could shed some light on this issue. Possibly if it's related to: 
> https://issues.apache.org/jira/browse/KAFKA-1382 , however we're using 
> 0.8.2.1.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Correct prefetching of data to KTable-like structure on application startup

2017-02-14 Thread Jan Lukavský

Hi Matthias,

I understand that the local cache will not be automatically cleared, but 
that is not an issue for me now.


The problem I see is still the same as at the beginning - even caching 
data to RocksDB in KafkaStreams implementation might (I would say) 
suffer from this issue. When using time based data retention (for 
whatever reason, maybe in combination with the log compation, but the 
the issue is there irrespective to whether the log compation is used or 
not), it is possible that some partition will report nonzero "next" 
offset, but will not be able to deliver any message to the KafkaConsumer 
(because the partition is emptied by the data retention) and therefore 
the consumer will not be able to finish the materialization of the topic 
to local store (either RocksDB or any other cache) and therefore will 
not be able to start processing the KStream. If I understand the problem 
right, then using timestamp will not help either, because there must be 
some sort of vector clock with a time dimension for each input 
partition, and the empty partition will not be able to move the 
timestamp any further, and therefore the whole system will remain 
blocked at timestamp 0, because the vector clock usually calculates 
minimum from all time dimensions.


Does that make any sense, or I am doing something fundamentally wrong? :)

Thanks again for any thoughts,

 Jan


On 02/13/2017 06:37 PM, Matthias J. Sax wrote:

Jan,

brokers with version 0.10.1 or higher allow to set both topic cleanup
policies in combination:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist

However, this will only delete data in you changelog topic but not in
your RocksDB -- if you want to get data delete in RocksDB, you would
need to send tombstone messages for those keys. It's kinda tricky to get
this done.

An "brute force" alternative would be, stop the application, delete the
local state directory, and restart. This will force Streams to recreate
the RocksDB files from the changelog and thus only loading keys that got
not deleted. But this is of course a quite expensive approach and you
should be very careful about using it.


-Matthias


On 2/13/17 12:25 AM, Jan Lukavský wrote:

Hi Michael,

sorry for my late answer. Configuring the topic as you suggest is one
option (and I will configure it that way), but I wanted to combine the
two data retention mechanisms (if possible). I would like to use log
compaction, so that I will always get at least the last message for
given key, but I would also like to use the classical temporal data
retention, which would function as a sort of TTL for the keys - if a key
doesn't get an update for the configured period of time, if could be
removed. That way I could ensure that out-dated keys could be removed.

Is there any other option for this? And can kafka be configured this way?

Best,

  Jan

On 02/09/2017 12:08 PM, Michael Noll wrote:

Jan,


   - if I don't send any data to a kafka partition for a period longer
then

the data retention interval, then all data from the partition is wiped
out

If I interpret your first and second message in this email thread
correctly, then you are talking only about your "state topic" here, i.e.
the topic that you read into a KTable.  You should configure this
topic to
use log compaction, which will ensure that the latest value for a
given key
will never be wiped.  So even if you don't send any data to a Kafka
partition of this (now log-compacted) "state topic" for a long period of
time, you'd always have access to (at least) the latest value for
every key.

Would that help?

-Michael





On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský  wrote:


Hi Matthias,

first of all, thanks for your answer. Sorry if I didn't explain the
problem well, I didn't want to dig too much into detail to focus on the
important and maybe the result was not clear.

My fault, I will try to explain in again. I have two KafkaConsumers
in two
separate threads consuming from two topics - let's call the first one
"stream topic" (processed like KStream)

and the second one "state topic" (processed like KTable). The state
topic
carries a persistent data that I need in order to process the stream
topic,
so I need to cache the state topic

locally before starting consumption of the stream topic. When the
application is running normally, there seems to be no issue with this,

because the state topic is updated asynchronously and I use internal
locks
to synchronize the processing inside the application. So far,
everything is
fine.


The problem might arise when the application starts - then I do the
following:

   - lock processing of the stream topic (because I don't have the state
topic cached)

   - read the current offset N from the state topic (which gives me
offsets
of a message that should be expected next, that is message that has
not yet
been written)

   - reset offset of the state topic to beginning 

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-02-14 Thread Guozhang Wang
Some thoughts on the mixture usage of DSL / PAPI:

There were some suggestions on mixing the usage of DSL and PAPI:
https://issues.apache.org/jira/browse/KAFKA-3455, and after thinking it a
bit more carefully, I'd rather not recommend users following this pattern,
since in DSL this can always be achieved in process() / transform(). Hence
I think it is okay to prevent such patterns in the new APIs. And for the
same reasons, I think we can remove KStreamBuilder#newName() from the
public APIs.

About KStreamBuilder#addInternalTopic(): I admit that we can optimize the
built topology for cases like "table.groupBy(..).aggregate(fn1);
table.groupBy(/*same
groupBy key*/).aggregate(fn2);" we can reuse the same repartition topic,
and we have plans to apply query optimization to the building process of
the topology. For now I'd rather suggest using KStream#through() to reuse
the topic.

And about printing the topology for debuggability: I agrees this is a
potential drawback, and I'd suggest maintain some functionality to build a
"dry topology" as Mathieu suggested; the difficulty is that, internally we
need a different "copy" of the topology for each thread so that they will
not share any states, so we cannot directly pass in the topology into
KafkaStreams instead of the topology builder. So how about adding a
`ToplogyBuilder#toString` function which calls `build()` internally then
prints the built dry topology?


Guozhang


On Tue, Feb 7, 2017 at 6:32 AM, Mathieu Fenniak <
mathieu.fenn...@replicon.com> wrote:

> On Mon, Feb 6, 2017 at 2:35 PM, Matthias J. Sax 
> wrote:
>
> > - adding KStreamBuilder#topologyBuilder() seems like be a good idea to
> > address any concern with limited access to TopologyBuilder and DSL/PAPI
> > mix-and-match approach. However, we should try to cover as much as
> > possible with #process(), #transform() etc.
> >
>
> That sounds like it'll work for me.
>
>
> > - about TopologyBuilder.nodeGroups & TopologyBuilder.build: not sure
> > what analysis you do -- there is also KafkaStreams#toString() that
> > describes the topology/DAG of the job. @Mathieu: Could you use this for
> > your analysis?
> >
>
> Well, I'd like to be able to output a graphviz diagram of my processor
> topology.  I am aware of KafkaStreams#toString(), but, it isn't the format
> I want, if I remember correctly I found it was ambiguous to parse &
> transform, and it also has the limitation of requiring a running and
> processing application as toString() doesn't return anything useful until
> the consumer stream threads are running.
>
> What I've whipped up with the existing ProcessorTopology API (
> https://gist.github.com/mfenniak/04f9c0bea8a1a2e0a747d678117df9f7) just
> builds a "dry" topology (ie. no data being processed) and outputs a graph.
> It's hooked into my app so that I can run with a specific command-line
> option to output the graph without having to start the processor.
>
> It's not the worst thing in the world to lose, or to have to jump through
> some reflection hoops to do. :-)  Perhaps a better approach would be to
> have an API designed specifically for this kind of introspection,
> independent of the much more commonly used API to build a topology.
>
> Mathieu
>



-- 
-- Guozhang


Re: [VOTE] 0.10.2.0 RC1

2017-02-14 Thread Moczarski, Swen
Thanks a lot for the fast fix!

I tested my code with the fix and it works fine without exception.

Regards,
Swen

Am 2/13/17, 8:38 PM schrieb "Guozhang Wang" :

Thanks for reporting the JIRA Swen.

Jason has a patch ready under KAFKA-4761 and I have reviewed it. You could
try it out and see if it has fixed your issue.

After this is merged in, we will need another RC.


Guozhang

On Mon, Feb 13, 2017 at 9:52 AM, Moczarski, Swen <
smoczar...@ebay-kleinanzeigen.de> wrote:

> +0 (non-binding)
>
> Thanks for compiling a new release candidate.
>
> I get an NullPointerException when setting batch.size=0 on producer
> config. This worked before with 0.10.1.1.
> See https://issues.apache.org/jira/browse/KAFKA-4761
>
> Regards,
> Swen
>
> Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" :
>
> Hello Kafka users, developers and client-developers,
>
> This is RC1 for release of Apache Kafka 0.10.2.0.
>
> This is a minor version release of Apache Kafka. It includes 19 new
> KIPs.
> See the release notes and release plan (https://cwiki.apache.org/
> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A
> few
> feature highlights: SASL-SCRAM support, improved client compatibility
> to
> allow use of clients newer than the broker, session windows and global
> tables in the Kafka Streams API, single message transforms in the 
Kafka
> Connect framework.
>
> Important note: in addition to the artifacts generated using JDK7 for
> Scala
> 2.10 and 2.11, this release also includes experimental artifacts built
> using JDK8 for Scala 2.12.
>
> Important code changes since RC0 (non-docs, non system tests):
>
> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
> * KAFKA-4734; Trim the time index on old segments
> * KAFKA-4725; Stop leaking messages in produce request body when
> requests
> are delayed
> * KAFKA-4716: Fix case when controller cannot be reached
>
> Release notes for the 0.10.2.0 release:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>
>
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
>
> * Successful Jenkins builds for the 0.10.2 branch:
> Unit/integration tests: https://builds.apache.org/job/
> kafka-0.10.2-jdk7/74/
> System tests: https://jenkins.confluent.io/
> job/system-test-kafka-0.10.2/25/
>
> /**
>
> Thanks,
> Ewen
>
>
>


-- 
-- Guozhang