Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-08 Thread Guozhang Wang
Hello community,

We have made a few changes based on the comments in this thread as well as
the DISCUSS thread. Summary of the changes can be found in the update
history (2017/02/03 and 2017/02/07) of the design doc:

https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
0wSw9ra8/edit#heading=h.i8usz7jt7ms6

And details of the changes can be found here:

https://www.mail-archive.com/dev@kafka.apache.org/msg66069.html


We'd like to solicit another round of voting for 72 hours due to these
changes, and people who have voted before on this thread are welcome to
re-review and vote again. Thanks!


Guozhang


On Fri, Feb 3, 2017 at 1:57 PM, Rajini Sivaram 
wrote:

> +1 (non-binding)
>
> (with additional authorization from Jason's note in the discussion thread)
>
>
> On Fri, Feb 3, 2017 at 1:10 AM, Apurva Mehta  wrote:
>
> > The wiki has been updated with a section on authorization, as well a
> > summary of the message format changes.
> >
> > On Thu, Feb 2, 2017 at 9:38 AM, Jason Gustafson 
> > wrote:
> >
> > > Thanks Tom, we'll update the wiki to reflect all the movement on the
> > design
> > > document. Did you have a specific concern with the new ACLs?
> > >
> > > -Jason
> > >
> > > On Thu, Feb 2, 2017 at 6:49 AM, Ismael Juma  wrote:
> > >
> > > > Hi Tom,
> > > >
> > > > That is a good point. During the discussion, it was agreed that
> changes
> > > to
> > > > public interfaces (message format, protocol, ACLs, etc.) would be
> > copied
> > > to
> > > > the wiki once the things had settled down, but it looks like that
> > hasn't
> > > > been done yet. I agree that it makes sense to do it before people
> vote
> > on
> > > > it.
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Feb 2, 2017 at 2:42 PM, Tom Crayford 
> > > wrote:
> > > >
> > > > > -1 (non-binding)
> > > > >
> > > > > I've been slow at keeping up with the KIP and the discussion
> thread.
> > > This
> > > > > is an exciting and quite complex new feature, which provides great
> > new
> > > > > functionality.
> > > > >
> > > > > There's a thing I noticed missing from the KIP that's present in
> the
> > > > google
> > > > > doc - the doc talks about ACLs for TransactionalId. If those are
> > going
> > > to
> > > > > land with the KIP, I think they should be included in the KIP
> itself,
> > > as
> > > > > new ACLs are significant security changes.
> > > > >
> > > > > On Thu, Feb 2, 2017 at 10:04 AM, Eno Thereska <
> > eno.there...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > +1 (non-binding).
> > > > > >
> > > > > > Excellent work and discussions!
> > > > > >
> > > > > > Eno
> > > > > > > On 2 Feb 2017, at 04:13, Guozhang Wang 
> > wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > We would like to start the voting process for KIP-98. The KIP
> can
> > > be
> > > > > > found
> > > > > > > at
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > >
> > > > > > > Discussion thread can be found here:
> > > > > > >
> > > > > > > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > > > > > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang


[jira] [Assigned] (KAFKA-4716) Fix logic for re-checking if internal topic is ready

2017-02-08 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-4716:
---

Assignee: Eno Thereska

> Fix logic for re-checking if internal topic is ready
> 
>
> Key: KAFKA-4716
> URL: https://issues.apache.org/jira/browse/KAFKA-4716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>Priority: Blocker
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> In InternalTopicManager, we have a hardcoded constant MAX_TOPIC_READY_TRY 
> that is set to 5. We shouldn't hardcode the retry time and it should be based 
> on a timeout, not on a number of retries.
> There are cases when the code in makeReady tries to create a topic but then 
> fails because the controller is currently in transition and we get a warning: 
> " Could not create internal topics: Could not create topic:  due 
> to This is not the correct controller for this cluster." The code proceeds to 
> retry MAX_TOPIC_READY_TRY times in a tight loop, and eventually fails. We 
> should have a retry backoff (perhaps just use retry.backoff.ms) and a timeout 
> (perhaps just use request.timeout.ms) instead of a number of retries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4716) Fix logic for re-checking if internal topic is ready

2017-02-08 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4716:

Fix Version/s: (was: 0.10.3.0)
   0.10.2.0

> Fix logic for re-checking if internal topic is ready
> 
>
> Key: KAFKA-4716
> URL: https://issues.apache.org/jira/browse/KAFKA-4716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Priority: Blocker
>  Labels: architecture
> Fix For: 0.10.2.0
>
>
> In InternalTopicManager, we have a hardcoded constant MAX_TOPIC_READY_TRY 
> that is set to 5. We shouldn't hardcode the retry time and it should be based 
> on a timeout, not on a number of retries.
> There are cases when the code in makeReady tries to create a topic but then 
> fails because the controller is currently in transition and we get a warning: 
> " Could not create internal topics: Could not create topic:  due 
> to This is not the correct controller for this cluster." The code proceeds to 
> retry MAX_TOPIC_READY_TRY times in a tight loop, and eventually fails. We 
> should have a retry backoff (perhaps just use retry.backoff.ms) and a timeout 
> (perhaps just use request.timeout.ms) instead of a number of retries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4716) Fix logic for re-checking if internal topic is ready

2017-02-08 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska updated KAFKA-4716:

Priority: Blocker  (was: Major)

> Fix logic for re-checking if internal topic is ready
> 
>
> Key: KAFKA-4716
> URL: https://issues.apache.org/jira/browse/KAFKA-4716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Priority: Blocker
>  Labels: architecture
> Fix For: 0.10.3.0
>
>
> In InternalTopicManager, we have a hardcoded constant MAX_TOPIC_READY_TRY 
> that is set to 5. We shouldn't hardcode the retry time and it should be based 
> on a timeout, not on a number of retries.
> There are cases when the code in makeReady tries to create a topic but then 
> fails because the controller is currently in transition and we get a warning: 
> " Could not create internal topics: Could not create topic:  due 
> to This is not the correct controller for this cluster." The code proceeds to 
> retry MAX_TOPIC_READY_TRY times in a tight loop, and eventually fails. We 
> should have a retry backoff (perhaps just use retry.backoff.ms) and a timeout 
> (perhaps just use request.timeout.ms) instead of a number of retries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Correct prefetching of data to KTable-like structure on application startup

2017-02-08 Thread Matthias J. Sax
Jan,

you scenario is quite complex and I am not sure if I understood every
part of it. I try to break it down:

> In my scenario on startup, I want to read all data from a topic (or a subset 
> of its partitions),
> wait until all the old data has been cached and then start processing of a 
> different stream

That is hard to accomplish in general. Kafka Streams internally uses
KafkaConsumer (one instance per StreamThread) and thus, does rely on the
consumer's behavior with regard to poll(). Hence, Streams cannot control
in detail, what data will be fetched from the brokers.

Furthermore, Streams follow its own internal strategy to pick a record
(from the available ones returned from poll()) and you cannot control in
your code (at least not directly) what record will be picked.

Basically, Streams tried to process records in "timestamp order", ie,
based an the timestamp returned from TimestampExtractor. So you can
"influence" the processing order by record timestamps (as far as you can
influence them) and/or by providing a custom TimestampExtractor.

In your example, you might want the records you want to process first
(KTable), to have smaller timestamps (ie, be earlier) than the records
from your KStream. But even this will only give you "best effort"
behavior, and it can happen that a KStream record is processed before
all KTable records to processed. It's a know issues but hard to resolve.

> when the specific partition doesn't get any message within the retention 
> period,
> then I end up stuck trying to prefetch data to the "KTable" - this is because 
> I get
> the offset of the last message (plus 1) from the broker, but I don't get any 
> data
> ever (until I send a message to the partition)

Cannot follow here: if there is no data, than you can of course not
process any data -- so why do you end up being stuck?

> The problem I see here is that kafka tells me what the last offset in a 
> partition is,
> but there is no upper bound on when a first message will arrive,

In general, the latency between data append at the broker and data
receive at a consumer is rather small. So even if there is strictly no
upper bound until a message gets delivered, this should not be an issue
in practice. Or do I miss understand something?

> even though I reset the offset and start reading from the beginning of a 
> partition.

How does this relate? Cannot follow.

> My question is, is it a possibility not to clear the whole partition, but to 
> always keep at least the last message?

Not with regular retention policy -- not sure if log compaction can help
here.

> That way, the client would always get at least the last message, can 
> therefore figure out
> it is at the end of the partition (reading the old data) and start processing.

Why is this required? If the client's offset is the same as "endOfLog"
for each partition, you can figure out that there is nothing to read. So
why would you need the last old message to figure this out?


-Matthias



On 2/7/17 3:46 AM, Jan Lukavský wrote:
> Hi all,
> 
> I have a question how to do a correct caching in KTable-like structure
> on application startup. I'm not sure if this belongs to user or dev
> maillist, so sorry if I've chosen the bad one. What is my observation so
> far:
> 
>  - if I don't send any data to a kafka partition for a period longer
> then the data retention interval, then all data from the partition is
> wiped out
> 
>  - the index file is not cleared (which is obvious, it has to keep track
> of the next offset to assign to a new message)
> 
> In my scenario on startup, I want to read all data from a topic (or a
> subset of its partitions), wait until all the old data has been cached
> and then start processing of a different stream (basically I'm doing a
> join of KStream and KTable, but I have implemented it manually due to
> some special behavior). Now, what is the issue here - when the specific
> partition doesn't get any message within the retention period, then I
> end up stuck trying to prefetch data to the "KTable" - this is because I
> get the offset of the last message (plus 1) from the broker, but I don't
> get any data ever (until I send a message to the partition). The problem
> I see here is that kafka tells me what the last offset in a partition
> is, but there is no upper bound on when a first message will arrive,
> even though I reset the offset and start reading from the beginning of a
> partition. My question is, is it a possibility not to clear the whole
> partition, but to always keep at least the last message? That way, the
> client would always get at least the last message, can therefore figure
> out it is at the end of the partition (reading the old data) and start
> processing. I believe that KTable implementation could have a very
> similar issue. Or is there any other way around? I could add a timeout,
> but this seems a little fragile.
> 
> Thanks in advance for any suggestions and opinions,
> 
>  Jan
> 



signature.asc

Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-08 Thread Mayuresh Gharat
Hi Jun,

Thanks for the review. Please find the responses inline.

1. It seems the problem that you are trying to address is that java
principal returned from KafkaChannel may have additional fields than name
that are needed during authorization. Have you considered a customized
PrincipleBuilder that extracts all needed fields from java principal and
squeezes them as a json in the name of the returned principal? Then, the
authorizer can just parse the json and extract needed fields.
---> Yes we had thought about this. We use a third party library that takes
in the passed in cert and creates the Principal. This Principal is then
used by the library to make the decision (ALLOW/DENY) when we call it in
the Authorizer. It does not have an API to create the Principal from a
String. If it did support, still we would have to be aware of the internal
details of the library, like the field values it creates from the certs,
defaults and so on.

2. Could you explain how the default authorizer works now? Currently, the
code just compares the two principal objects. Are we converting the java
principal to a KafkaPrincipal there?
---> The SimpleAclAuthorizer currently expects that, the Principal it
fetches from the Session object is an instance of KafkaPrincipal. It then
uses it compare with the KafkaPrincipal extracted from the stored ACLs. In
this case, we can construct the KafkaPrincipal object on the fly by using
the name of the Principal as follows :

*val principal = session.principal*
*val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
principal.getName)*
I was also planning to get rid of the principalType field in KafkaPrincipal as
it is always set to *"*User*"* in the SocketServer currently. After this
KIP, it will no longer be used in SocketServer. But to maintain backwards
compatibility of kafka-acls.sh, I preserved it.


3. Do we need to add the following method in PrincipalBuilder? The configs
are already passed in through configure() and an implementation can cache
it and use it in buildPrincipal(). It's also not clear to me where we call
the new and the old method, and whether both will be called or one of them
will be called.
Principal buildPrincipal(Map principalConfigs);
---> My thought was that the configure() method will be used to build the
PrincipalBuilder class object itself. It follows the same way as Authorizer
gets configured. The buildPrincipal(Map principalConfigs) will
be used to build individual principals.
Let me give an example, with the kafka-acls.sh :

   - bin/kafka-acls.sh --principalBuilder
   userDefinedPackage.kafka.security.PrincipalBuilder
--principalBuilder-properties
   principalBuilderService.rest.url=URL  --authorizer
   kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
   zookeeper.connect=localhost:2181 --add --allow-principal name=bob
   type=USER_PRINCIPAL --allow-principal name=ALPHA-GAMMA-SERVICE
   type=SERVICE_PRINCIPAL --allow-hosts Host1,Host2 --operations Read,Write
   --topic Test-topic
  1. *userDefinedPackage.kafka.security.PrincipalBuilder* is the user
  defined PrincipalBuilder class.
  2. *principalBuilderService.rest.url=URL* can be a remote service
  that provides you an HTTP endpoint which takes in a set of parameters and
  provides you with the Principal.
  3. *name=bob type=USER_PRINCIPAL* can be used by PrincipalBuilder to
  create UserPrincipal with name as bob
  4. *name=ALPHA-GAMMA-SERVICE type=SERVICE_PRINCIPAL *can be used by
  PrincipalBuilder to create a ServicePrincipal with name as
  ALPHA-GAMMA-SERVICE.
   - This seems more flexible and intuitive to me from end user's
   perspective.

Principal buildPrincipal(Map principalConfigs) will be called
from the commandline client kafka-acls.sh while the other API can be called
at runtime when Kafka receives a client request over request channel.

4. The KIP has "If users use there custom PrincipalBuilder, they will have
to implement there custom Authorizer as the out of box Authorizer that
Kafka provides uses KafkaPrincipal." This is not ideal for existing users.
Could we avoid that?
---> Yes, this is possible to avoid if we do point 2.


Thanks,

Mayuresh

On Wed, Feb 8, 2017 at 3:31 PM, Jun Rao  wrote:

> Hi, Mayuresh,
>
> Thanks for the KIP. A few comments below.
>
> 1. It seems the problem that you are trying to address is that java
> principal returned from KafkaChannel may have additional fields than name
> that are needed during authorization. Have you considered a customized
> PrincipleBuilder that extracts all needed fields from java principal and
> squeezes them as a json in the name of the returned principal? Then, the
> authorizer can just parse the json and extract needed fields.
>
> 2. Could you explain how the default authorizer works now? Currently, the
> code just compares the two principal objects. Are we converting the java
> principal to a KafkaPrincipal there?

Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-08 Thread Matthias J. Sax
+1

On 2/8/17 4:51 PM, Gwen Shapira wrote:
> +1 (binding)
> 
> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
>  wrote:
>> Hi everyone,
>>
>> Thank you for constructive feedback on KIP-121, 
>> KStream.peek(ForeachAction) ;
>> it seems like it is time to call a vote which I hope will pass easily :)
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-121%3A+Add+KStream+peek+method
>>
>> I believe the PR attached is already in good shape to consider merging:
>>
>> https://github.com/apache/kafka/pull/2493
>>
>> Thanks!
>> Steven
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Commented] (KAFKA-4709) Error message from Struct.validate() should include the name of the offending field.

2017-02-08 Thread Tegan Snyder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15859017#comment-15859017
 ] 

Tegan Snyder commented on KAFKA-4709:
-

Is there any way to figure out the field that is causing this error? I'm also 
receiving this error message when using the JDBC Source connector. Thanks.

{code}
org.apache.kafka.connect.errors.DataException: Invalid value: null used for 
required field
at 
org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:212)
at org.apache.kafka.connect.data.Struct.put(Struct.java:215)
at org.apache.kafka.connect.data.Struct.put(Struct.java:204)
at 
io.confluent.connect.jdbc.source.DataConverter.convertFieldValue(DataConverter.java:433)
at 
io.confluent.connect.jdbc.source.DataConverter.convertRecord(DataConverter.java:73)
at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.extractRecord(TimestampIncrementingTableQuerier.java:184)
at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:195)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

> Error message from Struct.validate() should include the name of the offending 
> field.
> 
>
> Key: KAFKA-4709
> URL: https://issues.apache.org/jira/browse/KAFKA-4709
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> Take a look at this repro.
> {code}
>   @Test
>   public void structValidate() {
> Schema schema = SchemaBuilder.struct()
> .field("one", Schema.STRING_SCHEMA)
> .field("two", Schema.STRING_SCHEMA)
> .field("three", Schema.STRING_SCHEMA)
> .build();
> Struct struct = new Struct(schema);
> struct.validate();
>   }
> {code}
> Any one of the fields could be causing the issue. The following exception is 
> thrown. This makes troubleshooting missing fields in connectors much more 
> difficult.
> {code}
> org.apache.kafka.connect.errors.DataException: Invalid value: null used for 
> required field
> {code}
> The error message should include the field or fields in the error message.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-08 Thread Bill Bejeck
+1

On Wed, Feb 8, 2017 at 7:51 PM, Gwen Shapira  wrote:

> +1 (binding)
>
> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
>  wrote:
> > Hi everyone,
> >
> > Thank you for constructive feedback on KIP-121,
> KStream.peek(ForeachAction) ;
> > it seems like it is time to call a vote which I hope will pass easily :)
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 121%3A+Add+KStream+peek+method
> >
> > I believe the PR attached is already in good shape to consider merging:
> >
> > https://github.com/apache/kafka/pull/2493
> >
> > Thanks!
> > Steven
> >
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Build failed in Jenkins: kafka-trunk-jdk8 #1262

2017-02-08 Thread Apache Jenkins Server
See 

Changes:

[jason] KAFKA-4749; Fix join-time-max and sync-time-max MeasurableStat type

--
[...truncated 4108 lines...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Dong Lin
I am not aware of any semantics that will be caused by sharing
NetworkClient between producer/consumer and AdminClient. But I agree that
there is currently no good way to share such an internal class between
them. And yes, goal is to reduce number of connections. For example, say we
want to enable auto data purge based on committed offset using
AdminClient.purgeDataBefore(...) in a stream processing application, then
in addition to producer or consumer, we will now have AdminClient in every
job. It means that the the number of connection between server and client
will double.

I have another comment on the KIP. Is AdminClient API supposed to be thread
safe? If so, should we mark private variables such as clientTimeoutMs to be
@volatile? Would it be a concern if two threads call
TopicsContext.setServerTimeout(...) concurrently to use different timeout
for their own use-case?

Thanks,
Dong

On Wed, Feb 8, 2017 at 6:50 PM, Jason Gustafson  wrote:

> I'm not too sure sharing NetworkClient is a good idea. The consumer and the
> producer both have request semantics which would be more difficult to
> reason about if the connections are shared with another client. Also, the
> NetworkClient is an internal class which is not really meant for users. Do
> we really want to open that up? Is the only benefit saving the number of
> connections? Seems not worth it in my opinion.
>
> -Jason
>
> On Wed, Feb 8, 2017 at 6:43 PM, Dong Lin  wrote:
>
> > BTW, the idea to share NetworkClient is suggested by Radai and I like
> this
> > idea.
> >
> > On Wed, Feb 8, 2017 at 6:39 PM, Dong Lin  wrote:
> >
> > > Hey Colin,
> > >
> > > Thanks for updating the KIP. I have two followup questions:
> > >
> > > - It seems that setCreationConfig(...) is a bit redundant given that
> most
> > > arguments (e.g. topic name, partition num) are already passed to
> > > TopicsContext.create(...) when user creates topic. Should we pass
> > > the creationConfig as a parameter to TopicsContext.create(..)?
> > >
> > > - I am wondering if we should also specify the constructor of the
> > > AdminClient in the KIP. Previously we agreed that AdminClient should
> have
> > > its own thread to poll NetworkClient to send/receive messages. Should
> we
> > > also allow AdminClient to use an existing NetworkClient that is
> provided
> > to
> > > the constructor? This would allow AdminClient to share NetworkClient
> with
> > > producer or consumer in order to reduce the total number of open
> sockets
> > on
> > > both client and server.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Wed, Feb 8, 2017 at 5:00 PM, Colin McCabe 
> wrote:
> > >
> > >> Hi all,
> > >>
> > >> I made some major revisions to the proposal on the wiki, so please
> check
> > >> it out.
> > >>
> > >> The new API is based on Ismael's suggestion of grouping related APIs.
> > >> There is only one layer of grouping.  I think that it's actually
> pretty
> > >> intuitive.  It's also based on the idea of using Futures, which
> several
> > >> people commented that they'd like to see.
> > >>
> > >> Here's a simple example:
> > >>
> > >>  > AdminClient client = new AdminClientImpl(myConfig);
> > >>  > try {
> > >>  >   client.topics().create("foo", 3, (short) 2, false).get();
> > >>  >   Collection topicNames = client.topics().list(false).
> get();
> > >>  >   log.info("Found topics: {}", Utils.mkString(topicNames, ", "));
> > >>  >   Collection nodes = client.nodes().list().get();
> > >>  >   log.info("Found cluster nodes: {}", Utils.mkString(nodes, ",
> "));
> > >>  > } finally {
> > >>  >   client.close();
> > >>  > }
> > >>
> > >> The good thing is, there is no Try, no 'get' prefixes, no messing with
> > >> batch APIs.  If there is an error, then Future#get() throws an
> > >> ExecutionException which wraps the relevant exception in the standard
> > >> Java way.
> > >>
> > >> Here's a slightly less simple example:
> > >>
> > >> > AdminClient client = new AdminClientImpl(myConfig);
> > >> > try {
> > >> >   List futures = new LinkedList<>();
> > >> >   for (String topicName: myNewTopicNames) {
> > >> > creations.add(client.topics().
> > >> > setClientTimeout(3).setCreationConfig(myTopicConfig).
> > >> >   create(topicName, 3, (short) 2, false));
> > >> >   }
> > >> >   Futures.waitForAll(futures);
> > >> > } finally {
> > >> >   client.close();
> > >> > }
> > >>
> > >> I went with Futures because I feel like ought to have some option for
> > >> doing async.  It's a style of programming that has become a lot more
> > >> popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
> > >> Ismael commented, Java 8 CompletableFuture is going to make Java's
> > >> support for fluent async programming a lot stronger by allowing call
> > >> chaining and much more.
> > >>
> > >> If we are going to support async, the simplest thing is just to make
> > >> everything return a future and let 

Build failed in Jenkins: kafka-trunk-jdk8 #1261

2017-02-08 Thread Apache Jenkins Server
See 

Changes:

[junrao] MINOR: changes to the production broker configuration docs.

--
[...truncated 4106 lines...]

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.MetricsDuringTopicCreationDeletionTest 

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Jason Gustafson
I'm not too sure sharing NetworkClient is a good idea. The consumer and the
producer both have request semantics which would be more difficult to
reason about if the connections are shared with another client. Also, the
NetworkClient is an internal class which is not really meant for users. Do
we really want to open that up? Is the only benefit saving the number of
connections? Seems not worth it in my opinion.

-Jason

On Wed, Feb 8, 2017 at 6:43 PM, Dong Lin  wrote:

> BTW, the idea to share NetworkClient is suggested by Radai and I like this
> idea.
>
> On Wed, Feb 8, 2017 at 6:39 PM, Dong Lin  wrote:
>
> > Hey Colin,
> >
> > Thanks for updating the KIP. I have two followup questions:
> >
> > - It seems that setCreationConfig(...) is a bit redundant given that most
> > arguments (e.g. topic name, partition num) are already passed to
> > TopicsContext.create(...) when user creates topic. Should we pass
> > the creationConfig as a parameter to TopicsContext.create(..)?
> >
> > - I am wondering if we should also specify the constructor of the
> > AdminClient in the KIP. Previously we agreed that AdminClient should have
> > its own thread to poll NetworkClient to send/receive messages. Should we
> > also allow AdminClient to use an existing NetworkClient that is provided
> to
> > the constructor? This would allow AdminClient to share NetworkClient with
> > producer or consumer in order to reduce the total number of open sockets
> on
> > both client and server.
> >
> > Thanks,
> > Dong
> >
> > On Wed, Feb 8, 2017 at 5:00 PM, Colin McCabe  wrote:
> >
> >> Hi all,
> >>
> >> I made some major revisions to the proposal on the wiki, so please check
> >> it out.
> >>
> >> The new API is based on Ismael's suggestion of grouping related APIs.
> >> There is only one layer of grouping.  I think that it's actually pretty
> >> intuitive.  It's also based on the idea of using Futures, which several
> >> people commented that they'd like to see.
> >>
> >> Here's a simple example:
> >>
> >>  > AdminClient client = new AdminClientImpl(myConfig);
> >>  > try {
> >>  >   client.topics().create("foo", 3, (short) 2, false).get();
> >>  >   Collection topicNames = client.topics().list(false).get();
> >>  >   log.info("Found topics: {}", Utils.mkString(topicNames, ", "));
> >>  >   Collection nodes = client.nodes().list().get();
> >>  >   log.info("Found cluster nodes: {}", Utils.mkString(nodes, ", "));
> >>  > } finally {
> >>  >   client.close();
> >>  > }
> >>
> >> The good thing is, there is no Try, no 'get' prefixes, no messing with
> >> batch APIs.  If there is an error, then Future#get() throws an
> >> ExecutionException which wraps the relevant exception in the standard
> >> Java way.
> >>
> >> Here's a slightly less simple example:
> >>
> >> > AdminClient client = new AdminClientImpl(myConfig);
> >> > try {
> >> >   List futures = new LinkedList<>();
> >> >   for (String topicName: myNewTopicNames) {
> >> > creations.add(client.topics().
> >> > setClientTimeout(3).setCreationConfig(myTopicConfig).
> >> >   create(topicName, 3, (short) 2, false));
> >> >   }
> >> >   Futures.waitForAll(futures);
> >> > } finally {
> >> >   client.close();
> >> > }
> >>
> >> I went with Futures because I feel like ought to have some option for
> >> doing async.  It's a style of programming that has become a lot more
> >> popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
> >> Ismael commented, Java 8 CompletableFuture is going to make Java's
> >> support for fluent async programming a lot stronger by allowing call
> >> chaining and much more.
> >>
> >> If we are going to support async, the simplest thing is just to make
> >> everything return a future and let people call get() if they want to run
> >> synchronously.  Having a mix of async and sync APIs is just going to be
> >> confusing and redundant.
> >>
> >> I think we should try to avoid creating single functions that start
> >> multiple requests if we can.  It makes things much uglier.  It means
> >> that you have to have some kind of request class that wraps up the
> >> request the user is trying to create, so that you can handle an array of
> >> those requests.  The return value has to be something like Map >> Try> to represent which nodes failed and succeeded.  This is the
> >> kind of stuff that, in my opinion, makes people scratch their heads.
> >>
> >> If we need to, we can still get some of the efficiency benefits of batch
> >> APIs by waiting for a millisecond or two before sending out a topic
> >> create() request to see if other create() requests arrive.  If so, we
> >> can coalesce them.  It might be better to figure out if this is an
> >> actual performance issue before implementing it, though.
> >>
> >> I think it would be good to get something out there, annotate it as
> >> @Unstable, and get feedback from people building against trunk and using
> >> 

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Dong Lin
BTW, the idea to share NetworkClient is suggested by Radai and I like this
idea.

On Wed, Feb 8, 2017 at 6:39 PM, Dong Lin  wrote:

> Hey Colin,
>
> Thanks for updating the KIP. I have two followup questions:
>
> - It seems that setCreationConfig(...) is a bit redundant given that most
> arguments (e.g. topic name, partition num) are already passed to
> TopicsContext.create(...) when user creates topic. Should we pass
> the creationConfig as a parameter to TopicsContext.create(..)?
>
> - I am wondering if we should also specify the constructor of the
> AdminClient in the KIP. Previously we agreed that AdminClient should have
> its own thread to poll NetworkClient to send/receive messages. Should we
> also allow AdminClient to use an existing NetworkClient that is provided to
> the constructor? This would allow AdminClient to share NetworkClient with
> producer or consumer in order to reduce the total number of open sockets on
> both client and server.
>
> Thanks,
> Dong
>
> On Wed, Feb 8, 2017 at 5:00 PM, Colin McCabe  wrote:
>
>> Hi all,
>>
>> I made some major revisions to the proposal on the wiki, so please check
>> it out.
>>
>> The new API is based on Ismael's suggestion of grouping related APIs.
>> There is only one layer of grouping.  I think that it's actually pretty
>> intuitive.  It's also based on the idea of using Futures, which several
>> people commented that they'd like to see.
>>
>> Here's a simple example:
>>
>>  > AdminClient client = new AdminClientImpl(myConfig);
>>  > try {
>>  >   client.topics().create("foo", 3, (short) 2, false).get();
>>  >   Collection topicNames = client.topics().list(false).get();
>>  >   log.info("Found topics: {}", Utils.mkString(topicNames, ", "));
>>  >   Collection nodes = client.nodes().list().get();
>>  >   log.info("Found cluster nodes: {}", Utils.mkString(nodes, ", "));
>>  > } finally {
>>  >   client.close();
>>  > }
>>
>> The good thing is, there is no Try, no 'get' prefixes, no messing with
>> batch APIs.  If there is an error, then Future#get() throws an
>> ExecutionException which wraps the relevant exception in the standard
>> Java way.
>>
>> Here's a slightly less simple example:
>>
>> > AdminClient client = new AdminClientImpl(myConfig);
>> > try {
>> >   List futures = new LinkedList<>();
>> >   for (String topicName: myNewTopicNames) {
>> > creations.add(client.topics().
>> > setClientTimeout(3).setCreationConfig(myTopicConfig).
>> >   create(topicName, 3, (short) 2, false));
>> >   }
>> >   Futures.waitForAll(futures);
>> > } finally {
>> >   client.close();
>> > }
>>
>> I went with Futures because I feel like ought to have some option for
>> doing async.  It's a style of programming that has become a lot more
>> popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
>> Ismael commented, Java 8 CompletableFuture is going to make Java's
>> support for fluent async programming a lot stronger by allowing call
>> chaining and much more.
>>
>> If we are going to support async, the simplest thing is just to make
>> everything return a future and let people call get() if they want to run
>> synchronously.  Having a mix of async and sync APIs is just going to be
>> confusing and redundant.
>>
>> I think we should try to avoid creating single functions that start
>> multiple requests if we can.  It makes things much uglier.  It means
>> that you have to have some kind of request class that wraps up the
>> request the user is trying to create, so that you can handle an array of
>> those requests.  The return value has to be something like Map> Try> to represent which nodes failed and succeeded.  This is the
>> kind of stuff that, in my opinion, makes people scratch their heads.
>>
>> If we need to, we can still get some of the efficiency benefits of batch
>> APIs by waiting for a millisecond or two before sending out a topic
>> create() request to see if other create() requests arrive.  If so, we
>> can coalesce them.  It might be better to figure out if this is an
>> actual performance issue before implementing it, though.
>>
>> I think it would be good to get something out there, annotate it as
>> @Unstable, and get feedback from people building against trunk and using
>> it.  We have removed or changed @Unstable APIs in streams before, so I
>> don't think we should worry that it will get set in stone prematurely.
>> The AdminClient API should get much less developer use than anything in
>> streams, so changing an unstable API should be much easier.
>>
>> best,
>> Colin
>>
>>
>> On Wed, Feb 8, 2017, at 07:49, Ismael Juma wrote:
>> > Thanks for elaborating Jay. I totally agree that we have to be very
>> > careful
>> > in how we use our complexity budget. Easier said than done when people
>> > don't agree on what is complex and what is simple. :) For example, I
>> > think
>> > batch APIs are a significant source of complexity as you have to do a
>> > bunch

[jira] [Commented] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858907#comment-15858907
 ] 

ASF GitHub Bot commented on KAFKA-4749:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2520


> fix join-time-max and sync-time-max MeasurableStat type
> ---
>
> Key: KAFKA-4749
> URL: https://issues.apache.org/jira/browse/KAFKA-4749
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.3.0
>
>
> GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
> incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2520: KAFKA-4749: fix join-time-max and sync-time-max Me...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2520


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type

2017-02-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-4749.

   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2520
[https://github.com/apache/kafka/pull/2520]

> fix join-time-max and sync-time-max MeasurableStat type
> ---
>
> Key: KAFKA-4749
> URL: https://issues.apache.org/jira/browse/KAFKA-4749
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
> Fix For: 0.10.3.0
>
>
> GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
> incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Dong Lin
Hey Colin,

Thanks for updating the KIP. I have two followup questions:

- It seems that setCreationConfig(...) is a bit redundant given that most
arguments (e.g. topic name, partition num) are already passed to
TopicsContext.create(...) when user creates topic. Should we pass
the creationConfig as a parameter to TopicsContext.create(..)?

- I am wondering if we should also specify the constructor of the
AdminClient in the KIP. Previously we agreed that AdminClient should have
its own thread to poll NetworkClient to send/receive messages. Should we
also allow AdminClient to use an existing NetworkClient that is provided to
the constructor? This would allow AdminClient to share NetworkClient with
producer or consumer in order to reduce the total number of open sockets on
both client and server.

Thanks,
Dong

On Wed, Feb 8, 2017 at 5:00 PM, Colin McCabe  wrote:

> Hi all,
>
> I made some major revisions to the proposal on the wiki, so please check
> it out.
>
> The new API is based on Ismael's suggestion of grouping related APIs.
> There is only one layer of grouping.  I think that it's actually pretty
> intuitive.  It's also based on the idea of using Futures, which several
> people commented that they'd like to see.
>
> Here's a simple example:
>
>  > AdminClient client = new AdminClientImpl(myConfig);
>  > try {
>  >   client.topics().create("foo", 3, (short) 2, false).get();
>  >   Collection topicNames = client.topics().list(false).get();
>  >   log.info("Found topics: {}", Utils.mkString(topicNames, ", "));
>  >   Collection nodes = client.nodes().list().get();
>  >   log.info("Found cluster nodes: {}", Utils.mkString(nodes, ", "));
>  > } finally {
>  >   client.close();
>  > }
>
> The good thing is, there is no Try, no 'get' prefixes, no messing with
> batch APIs.  If there is an error, then Future#get() throws an
> ExecutionException which wraps the relevant exception in the standard
> Java way.
>
> Here's a slightly less simple example:
>
> > AdminClient client = new AdminClientImpl(myConfig);
> > try {
> >   List futures = new LinkedList<>();
> >   for (String topicName: myNewTopicNames) {
> > creations.add(client.topics().
> > setClientTimeout(3).setCreationConfig(myTopicConfig).
> >   create(topicName, 3, (short) 2, false));
> >   }
> >   Futures.waitForAll(futures);
> > } finally {
> >   client.close();
> > }
>
> I went with Futures because I feel like ought to have some option for
> doing async.  It's a style of programming that has become a lot more
> popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
> Ismael commented, Java 8 CompletableFuture is going to make Java's
> support for fluent async programming a lot stronger by allowing call
> chaining and much more.
>
> If we are going to support async, the simplest thing is just to make
> everything return a future and let people call get() if they want to run
> synchronously.  Having a mix of async and sync APIs is just going to be
> confusing and redundant.
>
> I think we should try to avoid creating single functions that start
> multiple requests if we can.  It makes things much uglier.  It means
> that you have to have some kind of request class that wraps up the
> request the user is trying to create, so that you can handle an array of
> those requests.  The return value has to be something like Map Try> to represent which nodes failed and succeeded.  This is the
> kind of stuff that, in my opinion, makes people scratch their heads.
>
> If we need to, we can still get some of the efficiency benefits of batch
> APIs by waiting for a millisecond or two before sending out a topic
> create() request to see if other create() requests arrive.  If so, we
> can coalesce them.  It might be better to figure out if this is an
> actual performance issue before implementing it, though.
>
> I think it would be good to get something out there, annotate it as
> @Unstable, and get feedback from people building against trunk and using
> it.  We have removed or changed @Unstable APIs in streams before, so I
> don't think we should worry that it will get set in stone prematurely.
> The AdminClient API should get much less developer use than anything in
> streams, so changing an unstable API should be much easier.
>
> best,
> Colin
>
>
> On Wed, Feb 8, 2017, at 07:49, Ismael Juma wrote:
> > Thanks for elaborating Jay. I totally agree that we have to be very
> > careful
> > in how we use our complexity budget. Easier said than done when people
> > don't agree on what is complex and what is simple. :) For example, I
> > think
> > batch APIs are a significant source of complexity as you have to do a
> > bunch
> > of ceremony to group things before sending the request and error handling
> > becomes more complex due to partial failures (things like `Try` or other
> > mechanisms that serve a similar role are then needed).
> >
> > Maybe a way forward is to write API usage examples to 

[GitHub] kafka pull request #2519: MINOR: changes to the production broker configurat...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2519


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858816#comment-15858816
 ] 

ASF GitHub Bot commented on KAFKA-4749:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2520

KAFKA-4749: fix join-time-max and sync-time-max MeasurableStat type

GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2520


commit f2db0c59690ebb2db2e29d6a89718991ef862f05
Author: Onur Karaman 
Date:   2017-02-09T01:10:32Z

KAFKA-4749: fix join-time-max and sync-time-max MeasurableStat type




> fix join-time-max and sync-time-max MeasurableStat type
> ---
>
> Key: KAFKA-4749
> URL: https://issues.apache.org/jira/browse/KAFKA-4749
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
> incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2520: KAFKA-4749: fix join-time-max and sync-time-max Me...

2017-02-08 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2520

KAFKA-4749: fix join-time-max and sync-time-max MeasurableStat type

GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2520


commit f2db0c59690ebb2db2e29d6a89718991ef862f05
Author: Onur Karaman 
Date:   2017-02-09T01:10:32Z

KAFKA-4749: fix join-time-max and sync-time-max MeasurableStat type




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4749) fix join-time-max and sync-time-max MeasurableStat type

2017-02-08 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4749:
---

 Summary: fix join-time-max and sync-time-max MeasurableStat type
 Key: KAFKA-4749
 URL: https://issues.apache.org/jira/browse/KAFKA-4749
 Project: Kafka
  Issue Type: Bug
Reporter: Onur Karaman
Assignee: Onur Karaman


GroupCoordinatorMetrics currently sets up join-time-max and sync-time-max 
incorrectly as a "new Avg()" MeasurableStat instead of "new Max()"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Colin McCabe
Hi all,

I made some major revisions to the proposal on the wiki, so please check
it out.

The new API is based on Ismael's suggestion of grouping related APIs. 
There is only one layer of grouping.  I think that it's actually pretty
intuitive.  It's also based on the idea of using Futures, which several
people commented that they'd like to see.

Here's a simple example:

 > AdminClient client = new AdminClientImpl(myConfig);
 > try {
 >   client.topics().create("foo", 3, (short) 2, false).get();
 >   Collection topicNames = client.topics().list(false).get();
 >   log.info("Found topics: {}", Utils.mkString(topicNames, ", "));
 >   Collection nodes = client.nodes().list().get();
 >   log.info("Found cluster nodes: {}", Utils.mkString(nodes, ", "));
 > } finally {
 >   client.close();
 > }

The good thing is, there is no Try, no 'get' prefixes, no messing with
batch APIs.  If there is an error, then Future#get() throws an
ExecutionException which wraps the relevant exception in the standard
Java way.

Here's a slightly less simple example:

> AdminClient client = new AdminClientImpl(myConfig);
> try {
>   List futures = new LinkedList<>();
>   for (String topicName: myNewTopicNames) {
> creations.add(client.topics().
> setClientTimeout(3).setCreationConfig(myTopicConfig).
>   create(topicName, 3, (short) 2, false));
>   }
>   Futures.waitForAll(futures);
> } finally {
>   client.close();
> }

I went with Futures because I feel like ought to have some option for
doing async.  It's a style of programming that has become a lot more
popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
Ismael commented, Java 8 CompletableFuture is going to make Java's
support for fluent async programming a lot stronger by allowing call
chaining and much more.

If we are going to support async, the simplest thing is just to make
everything return a future and let people call get() if they want to run
synchronously.  Having a mix of async and sync APIs is just going to be
confusing and redundant.

I think we should try to avoid creating single functions that start
multiple requests if we can.  It makes things much uglier.  It means
that you have to have some kind of request class that wraps up the
request the user is trying to create, so that you can handle an array of
those requests.  The return value has to be something like Map to represent which nodes failed and succeeded.  This is the
kind of stuff that, in my opinion, makes people scratch their heads.

If we need to, we can still get some of the efficiency benefits of batch
APIs by waiting for a millisecond or two before sending out a topic
create() request to see if other create() requests arrive.  If so, we
can coalesce them.  It might be better to figure out if this is an
actual performance issue before implementing it, though.

I think it would be good to get something out there, annotate it as
@Unstable, and get feedback from people building against trunk and using
it.  We have removed or changed @Unstable APIs in streams before, so I
don't think we should worry that it will get set in stone prematurely. 
The AdminClient API should get much less developer use than anything in
streams, so changing an unstable API should be much easier.

best,
Colin


On Wed, Feb 8, 2017, at 07:49, Ismael Juma wrote:
> Thanks for elaborating Jay. I totally agree that we have to be very
> careful
> in how we use our complexity budget. Easier said than done when people
> don't agree on what is complex and what is simple. :) For example, I
> think
> batch APIs are a significant source of complexity as you have to do a
> bunch
> of ceremony to group things before sending the request and error handling
> becomes more complex due to partial failures (things like `Try` or other
> mechanisms that serve a similar role are then needed).
> 
> Maybe a way forward is to write API usage examples to help validate that
> the suggested API is indeed easy to use.
> 
> Ismael
> 
> On Wed, Feb 8, 2017 at 4:40 AM, Jay Kreps  wrote:
> 
> > Totally agree on CompletableFuture. Also agree with some of the rough edges
> > on the Consumer.
> >
> > I don't have much of a leg to stand on with the splitting vs not splitting
> > thing, really hard to argue one or the other is better. I guess the one
> > observation in watching us try to make good public apis over the years is I
> > am kind of in favor of a particular kind of simple. In particular I think
> > since the bar is sooo high in support and docs and the community of users
> > so broad in the range of their capabilities, it makes it so there is a lot
> > of value in dead simple interfaces that don't have a lot of conceptual
> > weight, don't introduce a lot of new classes or concepts or general
> > patterns that must be understood to use them correctly. So things like
> > nesting, or the Try class, or async apis, or even just a complex set of
> > classes representing arguments or 

[GitHub] kafka pull request #2519: MINOR: changes to the production broker configurat...

2017-02-08 Thread alexlod
GitHub user alexlod opened a pull request:

https://github.com/apache/kafka/pull/2519

MINOR: changes to the production broker configuration docs.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alexlod/kafka production-config-docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2519


commit 10b2daf0c16443ea4369bfe68c45b589327bc557
Author: Alex Loddengaard 
Date:   2017-02-09T00:55:53Z

MINOR: changes to the production broker configuration docs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4747) add metrics for KafkaConsumer.poll

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858800#comment-15858800
 ] 

ASF GitHub Bot commented on KAFKA-4747:
---

GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2518

KAFKA-4747: add metrics for KafkaConsumer.poll

KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have 
metrics directly associated with it.

We probably want to add two metrics:
1. time spent in KafkaConsumer.poll
2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4747

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2518


commit 287a25e84f2eb87793478f0a28fa1361707b2754
Author: Onur Karaman 
Date:   2017-02-09T00:55:12Z

KAFKA-4747: add metrics for KafkaConsumer.poll




> add metrics for KafkaConsumer.poll
> --
>
> Key: KAFKA-4747
> URL: https://issues.apache.org/jira/browse/KAFKA-4747
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>
> KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics 
> directly associated with it.
> We probably want to add two metrics:
> 1. time spent in KafkaConsumer.poll
> 2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2518: KAFKA-4747: add metrics for KafkaConsumer.poll

2017-02-08 Thread onurkaraman
GitHub user onurkaraman opened a pull request:

https://github.com/apache/kafka/pull/2518

KAFKA-4747: add metrics for KafkaConsumer.poll

KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have 
metrics directly associated with it.

We probably want to add two metrics:
1. time spent in KafkaConsumer.poll
2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onurkaraman/kafka KAFKA-4747

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2518


commit 287a25e84f2eb87793478f0a28fa1361707b2754
Author: Onur Karaman 
Date:   2017-02-09T00:55:12Z

KAFKA-4747: add metrics for KafkaConsumer.poll




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-08 Thread Gwen Shapira
+1 (binding)

On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
 wrote:
> Hi everyone,
>
> Thank you for constructive feedback on KIP-121, KStream.peek(ForeachAction V>) ;
> it seems like it is time to call a vote which I hope will pass easily :)
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-121%3A+Add+KStream+peek+method
>
> I believe the PR attached is already in good shape to consider merging:
>
> https://github.com/apache/kafka/pull/2493
>
> Thanks!
> Steven
>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


KIP-121 [VOTE]: Add KStream peek method

2017-02-08 Thread Steven Schlansker
Hi everyone,

Thank you for constructive feedback on KIP-121, KStream.peek(ForeachAction) ;
it seems like it is time to call a vote which I hope will pass easily :)

https://cwiki.apache.org/confluence/display/KAFKA/KIP-121%3A+Add+KStream+peek+method

I believe the PR attached is already in good shape to consider merging:

https://github.com/apache/kafka/pull/2493

Thanks!
Steven



signature.asc
Description: Message signed with OpenPGP using GPGMail


Build failed in Jenkins: kafka-trunk-jdk8 #1260

2017-02-08 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: renamed test so it is picked up by ducktape

[wangguoz] MINOR: Add logging when commitSync fails in StreamTask

[wangguoz] KAFKA-4702: Parametrize streams benchmarks to run at scale

--
[...truncated 29223 lines...]
org.apache.kafka.common.security.scram.ScramFormatterTest > saslName PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidClientFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validClientFinalMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
invalidServerFirstMessage PASSED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage STARTED

org.apache.kafka.common.security.scram.ScramMessagesTest > 
validServerFinalMessage PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > testClientMode PASSED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration STARTED

org.apache.kafka.common.security.ssl.SslFactoryTest > 
testSslFactoryConfiguration PASSED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse STARTED

org.apache.kafka.common.security.kerberos.KerberosNameTest > testParse PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testPrincipalNameCanContainSeparator PASSED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode STARTED

org.apache.kafka.common.security.auth.KafkaPrincipalTest > 
testEqualsAndHashCode PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithListenerNameOverride PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testSingleOption STARTED

org.apache.kafka.common.security.JaasContextTest > testSingleOption PASSED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes STARTED

org.apache.kafka.common.security.JaasContextTest > 
testNumericOptionWithoutQuotes PASSED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testConfigNoOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForServerWithWrongListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
STARTED

org.apache.kafka.common.security.JaasContextTest > testNumericOptionWithQuotes 
PASSED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue STARTED

org.apache.kafka.common.security.JaasContextTest > testQuotedOptionValue PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule 
STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingLoginModule PASSED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon STARTED

org.apache.kafka.common.security.JaasContextTest > testMissingSemicolon PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions STARTED

org.apache.kafka.common.security.JaasContextTest > testMultipleOptions PASSED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName STARTED

org.apache.kafka.common.security.JaasContextTest > 
testLoadForClientWithListenerName PASSED

org.apache.kafka.common.security.JaasContextTest > testMultipleLoginModules 
STARTED


[jira] [Commented] (KAFKA-4746) Offsets can be committed for the offsets topic

2017-02-08 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858753#comment-15858753
 ] 

Jason Gustafson commented on KAFKA-4746:


Thanks. I was thinking of cases where you're materializing to a persistent data 
store. In any case, I agree it's uncommon.

One minor suggestion on this ticket: in addition to checking on the broker, it 
might make sense to check on the client prior to sending the offset commit. 
That might let us provide a more helpful message. 

> Offsets can be committed for the offsets topic
> --
>
> Key: KAFKA-4746
> URL: https://issues.apache.org/jira/browse/KAFKA-4746
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>
> Though this is likely rare and I don't suspect to many people would try to do 
> this, we should prevent users from committing offsets for the offsets topic 
> into the offsets topic. This would essentially create an infinite loop in any 
> consumer consuming from that topic. Also committing offsets for a compacted 
> topic doesn't likely make sense anyway. 
> Here is a quick failing test I wrote to see if this guard exists:
> {code:title=OffsetCommitTest.scala|borderStyle=solid}
>  @Test
>   def testOffsetTopicOffsetCommit() {
> val topic1 = "__consumer_offsets"
> // Commit an offset
> val expectedReplicaAssignment = Map(0  -> List(1))
> val commitRequest = OffsetCommitRequest(
>   groupId = group,
>   requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> 
> OffsetAndMetadata(offset=42L)),
>   versionId = 2
> )
> val commitResponse = simpleConsumer.commitOffsets(commitRequest)
> assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, 
> commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4746) Offsets can be committed for the offsets topic

2017-02-08 Thread Grant Henke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858709#comment-15858709
 ] 

Grant Henke commented on KAFKA-4746:


I just mean that often when working with a compacted topic you read from the 
start of the topic every time your process restarts to see or rebuild "the 
current state". 

But you are right, that is a bit of an overstatement. There are likely cases 
where a process commits an offset to try and resume where it left off being 
well aware that the offsets could have been cleaned since it was last 
committed. As I understand before KIP-58/KAFKA-1981 it would be a race 
condition against the log cleaner whether the committed offset is valid or not. 
 Committing the offset also doesn't do anything to help ensure you didn't miss 
an offset that was cleaned while your application was not processing. 

KIP-58/KAFKA-1981 Fixed that to ensure some time passed before cleaning with  
min.compaction.lag.ms/min.compaction.lag.bytes/min.compaction.lag.messages

> Offsets can be committed for the offsets topic
> --
>
> Key: KAFKA-4746
> URL: https://issues.apache.org/jira/browse/KAFKA-4746
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>
> Though this is likely rare and I don't suspect to many people would try to do 
> this, we should prevent users from committing offsets for the offsets topic 
> into the offsets topic. This would essentially create an infinite loop in any 
> consumer consuming from that topic. Also committing offsets for a compacted 
> topic doesn't likely make sense anyway. 
> Here is a quick failing test I wrote to see if this guard exists:
> {code:title=OffsetCommitTest.scala|borderStyle=solid}
>  @Test
>   def testOffsetTopicOffsetCommit() {
> val topic1 = "__consumer_offsets"
> // Commit an offset
> val expectedReplicaAssignment = Map(0  -> List(1))
> val commitRequest = OffsetCommitRequest(
>   groupId = group,
>   requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> 
> OffsetAndMetadata(offset=42L)),
>   versionId = 2
> )
> val commitResponse = simpleConsumer.commitOffsets(commitRequest)
> assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, 
> commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2017-02-08 Thread Elias Levy (JIRA)
Elias Levy created KAFKA-4748:
-

 Summary: Need a way to shutdown all workers in a Streams 
application at the same time
 Key: KAFKA-4748
 URL: https://issues.apache.org/jira/browse/KAFKA-4748
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.1
Reporter: Elias Levy


If you have a fleet of Stream workers for an application and attempt to shut 
them down simultaneously (e.g. via SIGTERM and 
Runtime.getRuntime().addShutdownHook() and streams.close())), a large number of 
the workers fail to shutdown.

The problem appears to be a race condition between the shutdown signal and the 
consumer rebalancing that is triggered by some of the workers existing before 
others.  Apparently, workers that receive the signal later fail to exit 
apparently as they are caught in the rebalance.

Terminating workers in a rolling fashion is not advisable in some situations.  
The rolling shutdown will result in many unnecessary rebalances and may fail, 
as the application may have large amount of local state that a smaller number 
of nodes may not be able to store.

It would appear that there is a need for a protocol change to allow the 
coordinator to signal a consumer group to shutdown without leading to 
rebalancing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-08 Thread Jun Rao
Hi, Mayuresh,

Thanks for the KIP. A few comments below.

1. It seems the problem that you are trying to address is that java
principal returned from KafkaChannel may have additional fields than name
that are needed during authorization. Have you considered a customized
PrincipleBuilder that extracts all needed fields from java principal and
squeezes them as a json in the name of the returned principal? Then, the
authorizer can just parse the json and extract needed fields.

2. Could you explain how the default authorizer works now? Currently, the
code just compares the two principal objects. Are we converting the java
principal to a KafkaPrincipal there?

3. Do we need to add the following method in PrincipalBuilder? The configs
are already passed in through configure() and an implementation can cache
it and use it in buildPrincipal(). It's also not clear to me where we call
the new and the old method, and whether both will be called or one of them
will be called.
Principal buildPrincipal(Map principalConfigs);

4. The KIP has "If users use there custom PrincipalBuilder, they will have
to implement there custom Authorizer as the out of box Authorizer that
Kafka provides uses KafkaPrincipal." This is not ideal for existing users.
Could we avoid that?

Thanks,

Jun


On Fri, Feb 3, 2017 at 11:25 AM, Mayuresh Gharat  wrote:

> Hi All,
>
> It seems that there is no further concern with the KIP-111. At this point
> we would like to start the voting process. The KIP can be found at
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67638388
>
> Thanks,
>
> Mayuresh
>


[jira] [Commented] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858686#comment-15858686
 ] 

Ismael Juma commented on KAFKA-4745:


Your improvement suggestion makes sense to me, would you like to submit a PR?

> KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame
> --
>
> Key: KAFKA-4745
> URL: https://issues.apache.org/jira/browse/KAFKA-4745
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.10.1.1
>Reporter: Will Droste
> Fix For: 0.10.1.1
>
>
> There is a scenario where by the delegated OutputStream does not call flush 
> before close there will be missing data in the stream. The reason for this is 
> the stream is actually marked close before it is actually flushed.
> The end mark is written before the flush, also the writeEndMark was finishing 
> the stream so its redundant in this context to mark it finished. In my fork 
> the 'finished=true' was removed from the 'writeEndMark' method.
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> writeEndMark();
> flush();
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}
> should be
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> // finish any pending data
> writeBlock();
> // write out the end mark
> writeEndMark();
> // mark the stream as finished
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858687#comment-15858687
 ] 

Ismael Juma commented on KAFKA-4745:


Also `writeEndMark` sets finished=true as well.

> KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame
> --
>
> Key: KAFKA-4745
> URL: https://issues.apache.org/jira/browse/KAFKA-4745
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.10.1.1
>Reporter: Will Droste
> Fix For: 0.10.1.1
>
>
> There is a scenario where by the delegated OutputStream does not call flush 
> before close there will be missing data in the stream. The reason for this is 
> the stream is actually marked close before it is actually flushed.
> The end mark is written before the flush, also the writeEndMark was finishing 
> the stream so its redundant in this context to mark it finished. In my fork 
> the 'finished=true' was removed from the 'writeEndMark' method.
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> writeEndMark();
> flush();
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}
> should be
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> // finish any pending data
> writeBlock();
> // write out the end mark
> writeEndMark();
> // mark the stream as finished
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4746) Offsets can be committed for the offsets topic

2017-02-08 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858659#comment-15858659
 ] 

Jason Gustafson commented on KAFKA-4746:


I never thought of someone trying this, but it makes sense to forbid it. I'm 
not sure why you say committing offsets for a compacted topic doesn't make 
sense though. Can you elaborate?

> Offsets can be committed for the offsets topic
> --
>
> Key: KAFKA-4746
> URL: https://issues.apache.org/jira/browse/KAFKA-4746
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>
> Though this is likely rare and I don't suspect to many people would try to do 
> this, we should prevent users from committing offsets for the offsets topic 
> into the offsets topic. This would essentially create an infinite loop in any 
> consumer consuming from that topic. Also committing offsets for a compacted 
> topic doesn't likely make sense anyway. 
> Here is a quick failing test I wrote to see if this guard exists:
> {code:title=OffsetCommitTest.scala|borderStyle=solid}
>  @Test
>   def testOffsetTopicOffsetCommit() {
> val topic1 = "__consumer_offsets"
> // Commit an offset
> val expectedReplicaAssignment = Map(0  -> List(1))
> val commitRequest = OffsetCommitRequest(
>   groupId = group,
>   requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> 
> OffsetAndMetadata(offset=42L)),
>   versionId = 2
> )
> val commitResponse = simpleConsumer.commitOffsets(commitRequest)
> assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, 
> commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4747) add metrics for KafkaConsumer.poll

2017-02-08 Thread Onur Karaman (JIRA)
Onur Karaman created KAFKA-4747:
---

 Summary: add metrics for KafkaConsumer.poll
 Key: KAFKA-4747
 URL: https://issues.apache.org/jira/browse/KAFKA-4747
 Project: Kafka
  Issue Type: Improvement
Reporter: Onur Karaman
Assignee: Onur Karaman


KafkaConsumer heavily depends on KafkaConsumer.poll yet we don't have metrics 
directly associated with it.

We probably want to add two metrics:
1. time spent in KafkaConsumer.poll
2. time since last KafkaConsumer.poll (measured as now - endTimeOfLastPoll)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Jenkins build is back to normal : kafka-trunk-jdk8 #1259

2017-02-08 Thread Apache Jenkins Server
See 



Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Jorge Esteban Quilcate Otoya
Great. I think I got the idea. What about this options:

Scenarios:

1. Current status

´kafka-consumer-groups.sh --reset-offset --group cg1´

2. To Datetime

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-datetime
2017-01-01T00:00:00.000´

3. To Period

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-period P2D´

4. To Earliest

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-earliest´

5. To Latest

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-latest´

6. Minus 'n' offsets

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-minus n´

7. Plus 'n' offsets

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-plus n´

8. To specific offset

´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to x´

Scopes:

a. All topics used by Consumer Group

Don't specify --topics

b. Specific List of Topics

Add list of values in --topics t1,t2,tn

c. One Topic, all Partitions

Add one topic and no partitions values: --topic t1

d. One Topic, List of Partitions

Add one topic and partitions values: --topic t1 --partitions 0,1,2

About Reset Plan (JSON file):

I think is still valid to have the option to persist reset configuration as
a file, but I agree to give the option to run the tool without going down
to the JSON file.

Execution options:

1. Without execution argument (No args):

Print out results (reset plan)

2. With --execute argument:

Run reset process

3. With --output argument:

Save result in a JSON format.

4. Only with --execute option and --reset-file (path to JSON)

Reset based on file

4. Only with --verify option and --reset-file (path to JSON)

Verify file values with current offsets

I think we can remove --generate-and-execute because is a bit clumsy.

With this options we will be able to execute with manual JSON configuration.


El mié., 8 feb. 2017 a las 22:43, Ben Stopford ()
escribió:

> Yes - using a tool like this to skip a set of consumer groups over a
> corrupt/bad message is definitely appealing.
>
> B
>
> On Wed, Feb 8, 2017 at 9:37 PM Gwen Shapira  wrote:
>
> > I like the --reset-to-earliest and --reset-to-latest. In general,
> > since the JSON route is the most challenging for users, we want to
> > provide a lot of ways to do useful things without going there.
> >
> > Two things that can help:
> >
> > 1. A lot of times, users want to skip few messages that cause issues
> > and continue. maybe just specifying the topic, partition and delta
> > will be better than having to find the offset and write a JSON and
> > validate the JSON etc.
> >
> > 2. Thinking if there are other common use-cases that we can make easy
> > rather than just one generic but not very usable method.
> >
> > Gwen
> >
> > On Wed, Feb 8, 2017 at 3:25 AM, Jorge Esteban Quilcate Otoya
> >  wrote:
> > > Thanks for the feedback!
> > >
> > > @Onur, @Gwen:
> > >
> > > Agree. Actually at the first draft I considered to have it inside
> > > ´kafka-consumer-groups.sh´, but I decide to propose it as a standalone
> > tool
> > > to describe it clearly and focus it on reset functionality.
> > >
> > > But now that you mentioned, it does make sense to have it in
> > > ´kafka-consumer-groups.sh´. How would be a consistent way to introduce
> > it?
> > >
> > > Maybe something like this:
> > >
> > > ´kafka-consumer-groups.sh --reset-offset --generate --group cg1
> --topics
> > t1
> > > --reset-from 2017-01-01T00:00:00.000 --output plan.json´
> > >
> > > ´kafka-consumer-groups.sh --reset-offset --verify --reset-json-file
> > > plan.json´
> > >
> > > ´kafka-consumer-groups.sh --reset-offset --execute --reset-json-file
> > > plan.json´
> > >
> > > ´kafka-consumer-groups.sh --reset-offset --generate-and-execute --group
> > cg1
> > > --topics t1 --reset-from 2017-01-01T00:00:00.000´
> > >
> > > @Gwen:
> > >
> > >> It looks exactly like the replica assignment tool
> > >
> > > It was influenced by ;-) I use the generate-verify-execute process here
> > to
> > > make sure user will be aware of the result of this operation. At the
> > > beginning we considered only add a couple of options to Consumer Group
> > > Command:
> > >
> > > --rewind-to-timestamp and --rewind-to-period
> > >
> > > @Onur:
> > >
> > >> You can actually get away with overriding while members of the group
> > are live
> > > with method 2 by using group information from DescribeGroupsRequest.
> > >
> > > This means that we need to have Consumer Group stopped before executing
> > and
> > > start a new consumer internally to do this? Therefore, we won't be able
> > to
> > > consider executing reset when ConsumerGroup is active? (trying to
> relate
> > it
> > > with @Dong 5th question)
> > >
> > > @Dong:
> > >
> > >> Should we allow user to use wildcard to reset offset of all groups
> for a
> > > given topic as well?
> > >
> > > I haven't thought about this scenario. Could be interesting. Following
> > the
> > > 

Jenkins build is back to normal : kafka-trunk-jdk7 #1922

2017-02-08 Thread Apache Jenkins Server
See 



Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-08 Thread Jun Rao
Hi, Mani,

Thanks for the responses. A few more comments.

101.2/101.3. Could we just remove owner and renewer from
DelegationTokenResponse if we don't have a use case?

111. ExpireTokenResponse: Should we return the new expiration time in the
response?

112. DescribeTokenRequest: A common use case is probably to see a list of
tokens associated with a particular owner. Would it be useful to include a
list of owners in the request? We can use the same convention in other
requests such that if the list is set to null (i.e., length is -1), return
all tokens.

113. delegation.token.master.key: It seems that needs to be the same across
brokers? Perhaps we can mention that in the wiki.

114. Could we document the procedure of manually rotating the secret? Does
one have to do sth like: expire all existing tokens, rotate the secret, and
generate new tokens?

115. Could we also include in the command line the ability to describe
tokens?

116. Could you document the ACL rules associated with those new requests?
For example, do we allow any one to create, delete, describe delegation
tokens?

Thanks,

Jun


On Wed, Feb 8, 2017 at 1:35 AM, Manikumar  wrote:

> Hi Jun,
>
>
> > If a token expires, then every broker will potentially try to delete it
> > around the same time, but only one will succeed. So, we will have to deal
> > with failures in that case? Another way is to let just one broker (say,
> the
> > controller) deletes expired tokens.
> >
> >
>  Agree, we can run the token expiry check thread as part of controller
> broker.
>  WIll update the KIP.
>
>
> Thanks,
> Manikumar
>
>
> >
> > On Sun, Feb 5, 2017 at 9:54 AM, Manikumar 
> > wrote:
> >
> > > Hi Jun,
> > >
> > >  Please see the replies inline.
> > >
> > >
> > > > >
> > > > > Only one broker does the deletion. Broker updates the expiration in
> > its
> > > > > local cache
> > > > > and on zookeeper so other brokers also get notified and their cache
> > > > > statuses are updated as well.
> > > > >
> > > > >
> > > > Which broker does the deletion?
> > > >
> > >
> > > Any broker can handle the create/expire/renew/describe delegationtoken
> > > requests.
> > > changes are propagated through zk notifications.  Every broker is
> > > responsible for
> > > expiring the tokens. This check be can done during request handling
> time
> > > and/or
> > > during token authentication time.
> > >
> > >
> > > >
> > > >
> > > > 110. The diagrams in the wiki still show MD5 digest. Could you change
> > it
> > > to
> > > > SCRAM?
> > > >
> > > >
> > >   Updated the diagram.
> > >
> > >
> > >
> > > Thanks,
> > > Manikumar
> > >
> > >
> > >
> > >
> > > >
> > > >
> > > > >
> > > > > Thanks.
> > > > > Manikumar
> > > > >
> > > > >
> > > > > >
> > > > > > On Fri, Dec 23, 2016 at 9:26 AM, Manikumar <
> > > manikumar.re...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I would like to initiate the vote on KIP-48:
> > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+
> > > > > > > Delegation+token+support+for+Kafka
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Manikumar
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (KAFKA-4746) Offsets can be committed for the offsets topic

2017-02-08 Thread Grant Henke (JIRA)
Grant Henke created KAFKA-4746:
--

 Summary: Offsets can be committed for the offsets topic
 Key: KAFKA-4746
 URL: https://issues.apache.org/jira/browse/KAFKA-4746
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.9.0.0
Reporter: Grant Henke


Though this is likely rare and I don't suspect to many people would try to do 
this, we should prevent users from committing offsets for the offsets topic 
into the offsets topic. This would essentially create an infinite loop in any 
consumer consuming from that topic. Also committing offsets for a compacted 
topic doesn't likely make sense anyway. 

Here is a quick failing test I wrote to see if this guard exists:

{code:title=OffsetCommitTest.scala|borderStyle=solid}
 @Test
  def testOffsetTopicOffsetCommit() {
val topic1 = "__consumer_offsets"
// Commit an offset
val expectedReplicaAssignment = Map(0  -> List(1))
val commitRequest = OffsetCommitRequest(
  groupId = group,
  requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> 
OffsetAndMetadata(offset=42L)),
  versionId = 2
)
val commitResponse = simpleConsumer.commitOffsets(commitRequest)

assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, 
commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4738) Remove generic type of class ClientState

2017-02-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858581#comment-15858581
 ] 

Guozhang Wang commented on KAFKA-4738:
--

I'd suggest we only remove Generic T in this JIRA, as for Generic C, today it 
is using UUID while Integer in the test. UUID is not an ideal unique identifier 
for a client so it may be replaced in the future, so keeping Generic C would be 
fine for now.

> Remove generic type of class ClientState
> 
>
> Key: KAFKA-4738
> URL: https://issues.apache.org/jira/browse/KAFKA-4738
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sharad
>Priority: Minor
>  Labels: beginner, newbie
>
> Currently, class 
> {{org.apache.kafka.streams.processor.internals.assignment.ClientState}} 
> uses a generic type. However, within actual Streams code base the type will 
> always be {{TaskId}} (from package {{org.apache.kafka.streams.processor}}).
> Thus, this ticket is about removing the generic type and replace it with 
> {{TaskId}}, to simplify the code base.
> There are some tests, that use {{ClientState}} (what allows for a 
> slightly simplified test setup).  Those tests need to be updated to work 
> properly using {{TaskId}} instead of {{Integer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Ben Stopford
Yes - using a tool like this to skip a set of consumer groups over a
corrupt/bad message is definitely appealing.

B

On Wed, Feb 8, 2017 at 9:37 PM Gwen Shapira  wrote:

> I like the --reset-to-earliest and --reset-to-latest. In general,
> since the JSON route is the most challenging for users, we want to
> provide a lot of ways to do useful things without going there.
>
> Two things that can help:
>
> 1. A lot of times, users want to skip few messages that cause issues
> and continue. maybe just specifying the topic, partition and delta
> will be better than having to find the offset and write a JSON and
> validate the JSON etc.
>
> 2. Thinking if there are other common use-cases that we can make easy
> rather than just one generic but not very usable method.
>
> Gwen
>
> On Wed, Feb 8, 2017 at 3:25 AM, Jorge Esteban Quilcate Otoya
>  wrote:
> > Thanks for the feedback!
> >
> > @Onur, @Gwen:
> >
> > Agree. Actually at the first draft I considered to have it inside
> > ´kafka-consumer-groups.sh´, but I decide to propose it as a standalone
> tool
> > to describe it clearly and focus it on reset functionality.
> >
> > But now that you mentioned, it does make sense to have it in
> > ´kafka-consumer-groups.sh´. How would be a consistent way to introduce
> it?
> >
> > Maybe something like this:
> >
> > ´kafka-consumer-groups.sh --reset-offset --generate --group cg1 --topics
> t1
> > --reset-from 2017-01-01T00:00:00.000 --output plan.json´
> >
> > ´kafka-consumer-groups.sh --reset-offset --verify --reset-json-file
> > plan.json´
> >
> > ´kafka-consumer-groups.sh --reset-offset --execute --reset-json-file
> > plan.json´
> >
> > ´kafka-consumer-groups.sh --reset-offset --generate-and-execute --group
> cg1
> > --topics t1 --reset-from 2017-01-01T00:00:00.000´
> >
> > @Gwen:
> >
> >> It looks exactly like the replica assignment tool
> >
> > It was influenced by ;-) I use the generate-verify-execute process here
> to
> > make sure user will be aware of the result of this operation. At the
> > beginning we considered only add a couple of options to Consumer Group
> > Command:
> >
> > --rewind-to-timestamp and --rewind-to-period
> >
> > @Onur:
> >
> >> You can actually get away with overriding while members of the group
> are live
> > with method 2 by using group information from DescribeGroupsRequest.
> >
> > This means that we need to have Consumer Group stopped before executing
> and
> > start a new consumer internally to do this? Therefore, we won't be able
> to
> > consider executing reset when ConsumerGroup is active? (trying to relate
> it
> > with @Dong 5th question)
> >
> > @Dong:
> >
> >> Should we allow user to use wildcard to reset offset of all groups for a
> > given topic as well?
> >
> > I haven't thought about this scenario. Could be interesting. Following
> the
> > recommendation to add it into Consumer Group Command, in this case Group
> > argument will be optional if there are only 1 topic. I think for multiple
> > topic won't be that useful.
> >
> >> Should we allow user to specify timestamp per topic partition in the
> json
> > file as well?
> >
> > Don't think this could be a valid from the tool, but if Reset Plan is
> > generated, and user want to set the offset for a specific partition to
> > other offset (eventually based on another timestamp), and execute it, it
> > will be up to her/him.
> >
> >> Should the script take some credential file to make sure that this
> > operation is authenticated given the potential impact of this operation?
> >
> > Haven't tried to secure brokers yet, but the tool should support
> > authorization if it's enabled in the broker.
> >
> >> Should we provide constant to reset committed offset to earliest/latest
> > offset of a partition, e.g. -1 indicates earliest offset and -2 indicates
> > latest offset.
> >
> > I will go for something like ´--reset-to-earliest´ and
> ´--reset-to-latest´
> >
> >> Should we allow dynamic change of the comitted offset when consumer are
> > running, such that consumer will seek to the newly committed offset and
> > start consuming from there?
> >
> > Not sure about this. I will recommend to keep it simple and ask user to
> > stop consumers first. But I would considered it if the trade-offs are
> > clear.
> >
> > @Matthias
> >
> > Added :). And thanks a lot for your help to define this KIP!
> >
> >
> >
> > El mié., 8 feb. 2017 a las 7:47, Gwen Shapira ()
> > escribió:
> >
> >> As long as the CLI is a bit consistent? Like, not just adding 3
> >> arguments and a JSON parser to the existing tool, right?
> >>
> >> On Tue, Feb 7, 2017 at 10:29 PM, Onur Karaman
> >>  wrote:
> >> > I think it makes sense to just add the feature to
> >> kafka-consumer-groups.sh
> >> >
> >> > On Tue, Feb 7, 2017 at 10:24 PM, Gwen Shapira 
> wrote:
> >> >
> >> >> Thanks for the KIP. I'm super happy about adding the capability.
> >> >>
> >> >> I hate 

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Gwen Shapira
I like the --reset-to-earliest and --reset-to-latest. In general,
since the JSON route is the most challenging for users, we want to
provide a lot of ways to do useful things without going there.

Two things that can help:

1. A lot of times, users want to skip few messages that cause issues
and continue. maybe just specifying the topic, partition and delta
will be better than having to find the offset and write a JSON and
validate the JSON etc.

2. Thinking if there are other common use-cases that we can make easy
rather than just one generic but not very usable method.

Gwen

On Wed, Feb 8, 2017 at 3:25 AM, Jorge Esteban Quilcate Otoya
 wrote:
> Thanks for the feedback!
>
> @Onur, @Gwen:
>
> Agree. Actually at the first draft I considered to have it inside
> ´kafka-consumer-groups.sh´, but I decide to propose it as a standalone tool
> to describe it clearly and focus it on reset functionality.
>
> But now that you mentioned, it does make sense to have it in
> ´kafka-consumer-groups.sh´. How would be a consistent way to introduce it?
>
> Maybe something like this:
>
> ´kafka-consumer-groups.sh --reset-offset --generate --group cg1 --topics t1
> --reset-from 2017-01-01T00:00:00.000 --output plan.json´
>
> ´kafka-consumer-groups.sh --reset-offset --verify --reset-json-file
> plan.json´
>
> ´kafka-consumer-groups.sh --reset-offset --execute --reset-json-file
> plan.json´
>
> ´kafka-consumer-groups.sh --reset-offset --generate-and-execute --group cg1
> --topics t1 --reset-from 2017-01-01T00:00:00.000´
>
> @Gwen:
>
>> It looks exactly like the replica assignment tool
>
> It was influenced by ;-) I use the generate-verify-execute process here to
> make sure user will be aware of the result of this operation. At the
> beginning we considered only add a couple of options to Consumer Group
> Command:
>
> --rewind-to-timestamp and --rewind-to-period
>
> @Onur:
>
>> You can actually get away with overriding while members of the group are live
> with method 2 by using group information from DescribeGroupsRequest.
>
> This means that we need to have Consumer Group stopped before executing and
> start a new consumer internally to do this? Therefore, we won't be able to
> consider executing reset when ConsumerGroup is active? (trying to relate it
> with @Dong 5th question)
>
> @Dong:
>
>> Should we allow user to use wildcard to reset offset of all groups for a
> given topic as well?
>
> I haven't thought about this scenario. Could be interesting. Following the
> recommendation to add it into Consumer Group Command, in this case Group
> argument will be optional if there are only 1 topic. I think for multiple
> topic won't be that useful.
>
>> Should we allow user to specify timestamp per topic partition in the json
> file as well?
>
> Don't think this could be a valid from the tool, but if Reset Plan is
> generated, and user want to set the offset for a specific partition to
> other offset (eventually based on another timestamp), and execute it, it
> will be up to her/him.
>
>> Should the script take some credential file to make sure that this
> operation is authenticated given the potential impact of this operation?
>
> Haven't tried to secure brokers yet, but the tool should support
> authorization if it's enabled in the broker.
>
>> Should we provide constant to reset committed offset to earliest/latest
> offset of a partition, e.g. -1 indicates earliest offset and -2 indicates
> latest offset.
>
> I will go for something like ´--reset-to-earliest´ and ´--reset-to-latest´
>
>> Should we allow dynamic change of the comitted offset when consumer are
> running, such that consumer will seek to the newly committed offset and
> start consuming from there?
>
> Not sure about this. I will recommend to keep it simple and ask user to
> stop consumers first. But I would considered it if the trade-offs are
> clear.
>
> @Matthias
>
> Added :). And thanks a lot for your help to define this KIP!
>
>
>
> El mié., 8 feb. 2017 a las 7:47, Gwen Shapira ()
> escribió:
>
>> As long as the CLI is a bit consistent? Like, not just adding 3
>> arguments and a JSON parser to the existing tool, right?
>>
>> On Tue, Feb 7, 2017 at 10:29 PM, Onur Karaman
>>  wrote:
>> > I think it makes sense to just add the feature to
>> kafka-consumer-groups.sh
>> >
>> > On Tue, Feb 7, 2017 at 10:24 PM, Gwen Shapira  wrote:
>> >
>> >> Thanks for the KIP. I'm super happy about adding the capability.
>> >>
>> >> I hate the interface, though. It looks exactly like the replica
>> >> assignment tool. A tool everyone loves so much that there are multiple
>> >> projects, open and closed, that try to fix it.
>> >>
>> >> Can we swap it with something that looks a bit more like the consumer
>> >> group tool? or the kafka streams reset tool? Consistency is helpful in
>> >> such cases. I spent some time learning existing tools and learning yet
>> >> another one is a deterrent.

[jira] [Commented] (KAFKA-4702) Parametrize streams benchmarks to run at scale

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858546#comment-15858546
 ] 

ASF GitHub Bot commented on KAFKA-4702:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2478


> Parametrize streams benchmarks to run at scale
> --
>
> Key: KAFKA-4702
> URL: https://issues.apache.org/jira/browse/KAFKA-4702
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.3.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.10.3.0
>
>
> The streams benchmarks (in SimpleBenchmark.java and triggered through 
> kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py) 
> run as single-instance, with a simple 1 broker Kafka cluster. 
> We need to parametrize the tests so they can run at scale, e.g., with 10-100 
> KafkaStreams instances and similar number of brokers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2478: KAFKA-4702: Parametrize streams benchmarks to run ...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2478


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4702) Parametrize streams benchmarks to run at scale

2017-02-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4702.
--
   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2478
[https://github.com/apache/kafka/pull/2478]

> Parametrize streams benchmarks to run at scale
> --
>
> Key: KAFKA-4702
> URL: https://issues.apache.org/jira/browse/KAFKA-4702
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.3.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.10.3.0
>
>
> The streams benchmarks (in SimpleBenchmark.java and triggered through 
> kafka/tests/kafkatest/benchmarks/streams/streams_simple_benchmark_test.py) 
> run as single-instance, with a simple 1 broker Kafka cluster. 
> We need to parametrize the tests so they can run at scale, e.g., with 10-100 
> KafkaStreams instances and similar number of brokers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2514: MINOR: Add logging when commitSync fails in Stream...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2514


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2517: HOTFIX: renamed test so it is picked up by ducktap...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2517


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Will Droste (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Will Droste resolved KAFKA-4745.

   Resolution: Duplicate
Fix Version/s: 0.10.1.1

> KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame
> --
>
> Key: KAFKA-4745
> URL: https://issues.apache.org/jira/browse/KAFKA-4745
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.10.1.1
>Reporter: Will Droste
> Fix For: 0.10.1.1
>
>
> There is a scenario where by the delegated OutputStream does not call flush 
> before close there will be missing data in the stream. The reason for this is 
> the stream is actually marked close before it is actually flushed.
> The end mark is written before the flush, also the writeEndMark was finishing 
> the stream so its redundant in this context to mark it finished. In my fork 
> the 'finished=true' was removed from the 'writeEndMark' method.
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> writeEndMark();
> flush();
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}
> should be
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> // finish any pending data
> writeBlock();
> // write out the end mark
> writeEndMark();
> // mark the stream as finished
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Will Droste (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858450#comment-15858450
 ] 

Will Droste commented on KAFKA-4745:


Excellent thanks for the quick reply, next time I'll check the latest must of 
been on a tag.

IMHO I would not call 'flush' there because the underlying OutputStream should 
finish the stream based on the close. If they correctly abide by the spec.. It 
should not be required however if the design decision here is to be 
conservative I totally understand. I often flush and close at the application 
layer to be sure.



> KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame
> --
>
> Key: KAFKA-4745
> URL: https://issues.apache.org/jira/browse/KAFKA-4745
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.10.1.1
>Reporter: Will Droste
>
> There is a scenario where by the delegated OutputStream does not call flush 
> before close there will be missing data in the stream. The reason for this is 
> the stream is actually marked close before it is actually flushed.
> The end mark is written before the flush, also the writeEndMark was finishing 
> the stream so its redundant in this context to mark it finished. In my fork 
> the 'finished=true' was removed from the 'writeEndMark' method.
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> writeEndMark();
> flush();
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}
> should be
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> // finish any pending data
> writeBlock();
> // write out the end mark
> writeEndMark();
> // mark the stream as finished
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4716) Fix logic for re-checking if internal topic is ready

2017-02-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4716:
-
Labels: architecture  (was: )

> Fix logic for re-checking if internal topic is ready
> 
>
> Key: KAFKA-4716
> URL: https://issues.apache.org/jira/browse/KAFKA-4716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.3.0
>
>
> In InternalTopicManager, we have a hardcoded constant MAX_TOPIC_READY_TRY 
> that is set to 5. We shouldn't hardcode the retry time and it should be based 
> on a timeout, not on a number of retries.
> There are cases when the code in makeReady tries to create a topic but then 
> fails because the controller is currently in transition and we get a warning: 
> " Could not create internal topics: Could not create topic:  due 
> to This is not the correct controller for this cluster." The code proceeds to 
> retry MAX_TOPIC_READY_TRY times in a tight loop, and eventually fails. We 
> should have a retry backoff (perhaps just use retry.backoff.ms) and a timeout 
> (perhaps just use request.timeout.ms) instead of a number of retries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4716) Fix logic for re-checking if internal topic is ready

2017-02-08 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858439#comment-15858439
 ] 

Guozhang Wang commented on KAFKA-4716:
--

Coming from this commit: 
https://github.com/apache/kafka/pull/2478/commits/7d1968cce975d07c760b2248c6578e5841f5cece

Seems scale = 2 will doom to fail because of this issue?

> Fix logic for re-checking if internal topic is ready
> 
>
> Key: KAFKA-4716
> URL: https://issues.apache.org/jira/browse/KAFKA-4716
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>  Labels: architecture
> Fix For: 0.10.3.0
>
>
> In InternalTopicManager, we have a hardcoded constant MAX_TOPIC_READY_TRY 
> that is set to 5. We shouldn't hardcode the retry time and it should be based 
> on a timeout, not on a number of retries.
> There are cases when the code in makeReady tries to create a topic but then 
> fails because the controller is currently in transition and we get a warning: 
> " Could not create internal topics: Could not create topic:  due 
> to This is not the correct controller for this cluster." The code proceeds to 
> retry MAX_TOPIC_READY_TRY times in a tight loop, and eventually fails. We 
> should have a retry backoff (perhaps just use retry.backoff.ms) and a timeout 
> (perhaps just use request.timeout.ms) instead of a number of retries.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858434#comment-15858434
 ] 

Ismael Juma commented on KAFKA-4745:


This code has changed since 0.10.1.1 to be closer to what you suggested:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java#L260



> KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame
> --
>
> Key: KAFKA-4745
> URL: https://issues.apache.org/jira/browse/KAFKA-4745
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Affects Versions: 0.10.1.1
>Reporter: Will Droste
>
> There is a scenario where by the delegated OutputStream does not call flush 
> before close there will be missing data in the stream. The reason for this is 
> the stream is actually marked close before it is actually flushed.
> The end mark is written before the flush, also the writeEndMark was finishing 
> the stream so its redundant in this context to mark it finished. In my fork 
> the 'finished=true' was removed from the 'writeEndMark' method.
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> writeEndMark();
> flush();
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}
> should be
> {code}
> @Override
> public void close() throws IOException {
> if (!finished) {
> // finish any pending data
> writeBlock();
> // write out the end mark
> writeEndMark();
> // mark the stream as finished
> finished = true;
> }
> if (out != null) {
> out.close();
> out = null;
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4648) Improve test coverage StreamTask

2017-02-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-4648:
-
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2451
[https://github.com/apache/kafka/pull/2451]

> Improve test coverage StreamTask
> 
>
> Key: KAFKA-4648
> URL: https://issues.apache.org/jira/browse/KAFKA-4648
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Minor
> Fix For: 0.10.3.0
>
>
> Exception paths in {{schedule}}, {{closeTopology}}, {{punctuate}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4648) Improve test coverage StreamTask

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858429#comment-15858429
 ] 

ASF GitHub Bot commented on KAFKA-4648:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2451


> Improve test coverage StreamTask
> 
>
> Key: KAFKA-4648
> URL: https://issues.apache.org/jira/browse/KAFKA-4648
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Minor
> Fix For: 0.10.3.0
>
>
> Exception paths in {{schedule}}, {{closeTopology}}, {{punctuate}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2451: KAFKA-4648: Improve test coverage StreamTask

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2451


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4745) KafkaLZ4BlockOutputStream.java incorrectly finishes the last frame

2017-02-08 Thread Will Droste (JIRA)
Will Droste created KAFKA-4745:
--

 Summary: KafkaLZ4BlockOutputStream.java incorrectly finishes the 
last frame
 Key: KAFKA-4745
 URL: https://issues.apache.org/jira/browse/KAFKA-4745
 Project: Kafka
  Issue Type: Bug
  Components: compression
Affects Versions: 0.10.1.1
Reporter: Will Droste


There is a scenario where by the delegated OutputStream does not call flush 
before close there will be missing data in the stream. The reason for this is 
the stream is actually marked close before it is actually flushed.

The end mark is written before the flush, also the writeEndMark was finishing 
the stream so its redundant in this context to mark it finished. In my fork the 
'finished=true' was removed from the 'writeEndMark' method.

{code}
@Override
public void close() throws IOException {
if (!finished) {
writeEndMark();
flush();
finished = true;
}
if (out != null) {
out.close();
out = null;
}
}
{code}

should be

{code}
@Override
public void close() throws IOException {
if (!finished) {
// finish any pending data
writeBlock();
// write out the end mark
writeEndMark();
// mark the stream as finished
finished = true;
}
if (out != null) {
out.close();
out = null;
}
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-121 [Discuss]: Add KStream peek method

2017-02-08 Thread Steven Schlansker
Yes, thank you everyone for your input!

I will incorporate the latest round of test revisions
and submit a VOTE thread later today :)

> On Feb 8, 2017, at 10:48 AM, Eno Thereska  wrote:
> 
> Steven,
> 
> Sounds like we can start a VOTE thread on this? Is the KIP up to date with 
> all the latest comments?
> 
> Thanks
> Eno
>> On 8 Feb 2017, at 18:05, Matthias J. Sax  wrote:
>> 
>> I like this idea. But to get clean and concise PRs, I would prefer to
>> have a JIRA and extra PR for this.
>> 
>> WDYT?
>> 
>> 
>> -Matthias
>> 
>> On 2/8/17 9:35 AM, Guozhang Wang wrote:
>>> The KIP proposal LGTM, thanks Steven!
>>> 
>>> One meta comment on the PR itself: I'm wondering if we could refactoring
>>> the implementation of `KStream.print() / writeAsText()` to just be a
>>> special impl of `peek()` then, like we did for `count` as for `aggregate`?
>>> I.e. we can replace the `KeyValuePrinter` class with an internal ForEach
>>> impl within `peek()`.
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Tue, Feb 7, 2017 at 11:09 AM, Gwen Shapira  wrote:
>>> 
 Far better! Thank you!
 
 On Tue, Feb 7, 2017 at 10:19 AM, Steven Schlansker
  wrote:
> Thanks for the feedback.  I improved the javadoc a bit, do you like it
 better?
> 
>   /**
>* Perform an action on each record of {@code KStream}.
>* This is a stateless record-by-record operation (cf. {@link
 #process(ProcessorSupplier, String...)}).
>*
>* Peek is a non-terminal operation that triggers a side effect
 (such as logging or statistics collection)
>* and returns an unchanged stream.
>*
>* Note that since this operation is stateless, it may execute
 multiple times for a single record in failure cases.
>*
>* @param action an action to perform on each record
>* @see #process(ProcessorSupplier, String...)
>*/
>   KStream peek(final ForeachAction action);
> 
> Updated in-place on the PR.
> 
>> On Feb 7, 2017, at 2:19 AM, Michael Noll  wrote:
>> 
>> Many thanks for the KIP and the PR, Steven!
>> 
>> My opinion, too, is that we should consider including this.
>> 
>> One thing that I would like to see clarified is the difference between
 the
>> proposed peek() and existing functions map() and foreach(), for
 instance.
>> My understanding (see also the Java 8 links below) is that:
>> 
>> - Like `map`, `peek` will return a KStream.  This also means that,
 unlike
>> `foreach`, `peek` is not a terminal operation.
>> - The main purpose of `peek` is, similar to `foreach`, the *side
 effects*
>> (such as the metrics counter example in the KIP) -- and, on a related
 note,
>> also to express your *intent* to achieve such side effects in the first
>> place (which is similar to when to use `foreach` rather than `map`); and
>> typically you should not (must not?) modify the underlying stream itself
>> (unlike `map`, which is supposed to do exactly that).
>> 
>> For reference, here are the descriptions of peek, map, foreach in Java
 8.
>> I could have also included links to StackOverflow questions where people
>> were confused about when (not) to use peek. ;-)
>> 
>> https://docs.oracle.com/javase/8/docs/api/java/util/
 stream/Stream.html#peek-java.util.function.Consumer-
>> https://docs.oracle.com/javase/8/docs/api/java/util/
 stream/Stream.html#map-java.util.function.Function-
>> https://docs.oracle.com/javase/8/docs/api/java/util/
 stream/Stream.html#forEach-java.util.function.Consumer-
>> 
>> Best wishes,
>> Michael
>> 
>> 
>> 
>> 
>> 
>> 
>> On Tue, Feb 7, 2017 at 10:37 AM, Damian Guy 
 wrote:
>> 
>>> Hi Steven,
>>> Thanks for the KIP. I think this is a worthy addition to the API.
>>> 
>>> Thanks,
>>> Damian
>>> 
>>> On Tue, 7 Feb 2017 at 09:30 Eno Thereska 
 wrote:
>>> 
 Hi,
 
 I like the proposal, thank you. I have found it frustrating myself
 not to
 be able to understand simple things, like how many records have been
 currently processed. The peek method would allow those kinds of
>>> diagnostics
 and debugging.
 
 Gwen, it is possible to do this with the existing functionality like
 map,
 but you'd have to fake the map method. Also, it is not great using map
>>> for
 things it was not intended for. Having an explicit peek makes it
 clearer
>>> in
 my opinion.
 
 Thanks
 Eno
 
> On 7 Feb 2017, at 03:20, Gwen Shapira  wrote:
> 
> I've read the wiki and am unclear about the 

[jira] [Commented] (KAFKA-4195) support throttling on request rate

2017-02-08 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858410#comment-15858410
 ] 

Jun Rao commented on KAFKA-4195:


Ismael brought up a good point that it may not be easy to set a request rate 
limit (e.g., is it 10/sec or 100/sec). An alternative way is to model the 
request handler pool as shared resource and allow the admin to set a max 
percentage (e.g., 5%) that a user/client can use. This should cover the case 
when a client floods the broker with many requests and potentially some other 
bad cases.

Implementation wise, we can measure the faction of request handler capacity 
used in KafkaApis.handle() and delay the processing of the request if the limit 
is exceeded.
We also may need to think a bit on how to integrate the new throttling with the 
existing throttling that we already have on byte rate in producer/consumer and 
followers.


> support throttling on request rate
> --
>
> Key: KAFKA-4195
> URL: https://issues.apache.org/jira/browse/KAFKA-4195
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Rajini Sivaram
>
> Currently, we can throttle the client by data volume. However, if a client 
> sends requests too quickly (e.g., a consumer with min.byte configured to 0), 
> it can still overwhelm the broker. It would be useful to additionally support 
> throttling by request rate. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: KIP-121 [Discuss]: Add KStream peek method

2017-02-08 Thread Eno Thereska
Steven,

Sounds like we can start a VOTE thread on this? Is the KIP up to date with all 
the latest comments?

Thanks
Eno
> On 8 Feb 2017, at 18:05, Matthias J. Sax  wrote:
> 
> I like this idea. But to get clean and concise PRs, I would prefer to
> have a JIRA and extra PR for this.
> 
> WDYT?
> 
> 
> -Matthias
> 
> On 2/8/17 9:35 AM, Guozhang Wang wrote:
>> The KIP proposal LGTM, thanks Steven!
>> 
>> One meta comment on the PR itself: I'm wondering if we could refactoring
>> the implementation of `KStream.print() / writeAsText()` to just be a
>> special impl of `peek()` then, like we did for `count` as for `aggregate`?
>> I.e. we can replace the `KeyValuePrinter` class with an internal ForEach
>> impl within `peek()`.
>> 
>> 
>> Guozhang
>> 
>> 
>> On Tue, Feb 7, 2017 at 11:09 AM, Gwen Shapira  wrote:
>> 
>>> Far better! Thank you!
>>> 
>>> On Tue, Feb 7, 2017 at 10:19 AM, Steven Schlansker
>>>  wrote:
 Thanks for the feedback.  I improved the javadoc a bit, do you like it
>>> better?
 
/**
 * Perform an action on each record of {@code KStream}.
 * This is a stateless record-by-record operation (cf. {@link
>>> #process(ProcessorSupplier, String...)}).
 *
 * Peek is a non-terminal operation that triggers a side effect
>>> (such as logging or statistics collection)
 * and returns an unchanged stream.
 *
 * Note that since this operation is stateless, it may execute
>>> multiple times for a single record in failure cases.
 *
 * @param action an action to perform on each record
 * @see #process(ProcessorSupplier, String...)
 */
KStream peek(final ForeachAction action);
 
 Updated in-place on the PR.
 
> On Feb 7, 2017, at 2:19 AM, Michael Noll  wrote:
> 
> Many thanks for the KIP and the PR, Steven!
> 
> My opinion, too, is that we should consider including this.
> 
> One thing that I would like to see clarified is the difference between
>>> the
> proposed peek() and existing functions map() and foreach(), for
>>> instance.
> My understanding (see also the Java 8 links below) is that:
> 
> - Like `map`, `peek` will return a KStream.  This also means that,
>>> unlike
> `foreach`, `peek` is not a terminal operation.
> - The main purpose of `peek` is, similar to `foreach`, the *side
>>> effects*
> (such as the metrics counter example in the KIP) -- and, on a related
>>> note,
> also to express your *intent* to achieve such side effects in the first
> place (which is similar to when to use `foreach` rather than `map`); and
> typically you should not (must not?) modify the underlying stream itself
> (unlike `map`, which is supposed to do exactly that).
> 
> For reference, here are the descriptions of peek, map, foreach in Java
>>> 8.
> I could have also included links to StackOverflow questions where people
> were confused about when (not) to use peek. ;-)
> 
> https://docs.oracle.com/javase/8/docs/api/java/util/
>>> stream/Stream.html#peek-java.util.function.Consumer-
> https://docs.oracle.com/javase/8/docs/api/java/util/
>>> stream/Stream.html#map-java.util.function.Function-
> https://docs.oracle.com/javase/8/docs/api/java/util/
>>> stream/Stream.html#forEach-java.util.function.Consumer-
> 
> Best wishes,
> Michael
> 
> 
> 
> 
> 
> 
> On Tue, Feb 7, 2017 at 10:37 AM, Damian Guy 
>>> wrote:
> 
>> Hi Steven,
>> Thanks for the KIP. I think this is a worthy addition to the API.
>> 
>> Thanks,
>> Damian
>> 
>> On Tue, 7 Feb 2017 at 09:30 Eno Thereska 
>>> wrote:
>> 
>>> Hi,
>>> 
>>> I like the proposal, thank you. I have found it frustrating myself
>>> not to
>>> be able to understand simple things, like how many records have been
>>> currently processed. The peek method would allow those kinds of
>> diagnostics
>>> and debugging.
>>> 
>>> Gwen, it is possible to do this with the existing functionality like
>>> map,
>>> but you'd have to fake the map method. Also, it is not great using map
>> for
>>> things it was not intended for. Having an explicit peek makes it
>>> clearer
>> in
>>> my opinion.
>>> 
>>> Thanks
>>> Eno
>>> 
 On 7 Feb 2017, at 03:20, Gwen Shapira  wrote:
 
 I've read the wiki and am unclear about the proposal. Can you provide
 something like a Javadoc for peek()? What would this method do?
 
 Also, forgive me if I'm missing an important point here, but can't I
 put the println statement in a map()?
 
 On Mon, Feb 6, 2017 at 5:48 PM, Matthias J. Sax <
>>> matth...@confluent.io
>>> 
>>> 

Re: Streams: TTLCacheStore

2017-02-08 Thread Elias Levy
The use case is a simple one.  You can think of it as an update mechanism.
One stream is a set of tuples consisting of consumer id, an object id, the
value of some property of the object, and a timestamp.  This stream
represents a record of what we told some consumer the value of some
property of the object was at a point in time.  The other stream is a set
of tuples consisting of object id, object property value, and timestamp.
This stream is just the change log of the value of the object property.
The job joins these streams to generate messages to clients to inform of
changes to an object property that they previously inquired about.

The requirements for a TTL cache rather than a standard key-value store
comes from the fact that we only wish to generate such an update if we last
informed the client about the property during some time span.  If there
were no TTL, the state would grow without bounds, as there is no signal to
remove an entry from the state.

Note that this is not the same as a KStream-KStream windowed join.  For the
first stream we are only interested in keeping track of an object's
property value last handed out to a client.  For the second stream we are
only interested in keeping track of the latest value of an object's
property. It more closely resembles a KTable-KTable join, but one where the
entries of the table have a TTL and one table has a key that is a subset of
the other (object id vs the compound [object id, agent id]).

My guess is that other have use cases that call for a store that expires
entries, as otherwise the store may grow unbounded as there aren't signals
other than time to trigger removal of entries from the store.

As for the use of RocksDB TTL feature, after looking at it, I don't think
it will be a good fit.  The TTL is computed from insertion time, not a
timestamp associated with a entry that can be passed in.  That would be
problematic if a stream is delayed or you a reprocessing old data.
Therefore the segmented store seems like a better basis for an
implementation.



On Wed, Feb 8, 2017 at 9:53 AM, Guozhang Wang  wrote:

> Hello Elias,
>
> I would love to solicit more feedbacks from the community on how commonly
> used a TTL persistent KV store. Maybe you can share your use cases first
> here in this thread?
>
> As for its implementation, I think leveraging rocksdb's TTL feature would
> be a good option. One tricky part though, is how we can insert the
> corresponding "tombstone" messages to the changelogs as well in an
> efficient way (this is also the main reason we did not add this feature in
> the first release of Kafka Streams). I remember rocksdb's TTL feature do
> have a compaction listener interface, but not sure if it is available in
> JNI. That may worth exploring.
>
>
> Guozhang
>
>
> On Mon, Feb 6, 2017 at 8:17 PM, Elias Levy 
> wrote:
>
> > We have a use case within a Streams application that requires a
> persistent
> > TTL key-value cache store.  As Streams does not currently offer such a
> > store, I've implemented it by abusing WindowStore, as it allows for a
> > configurable retention period.  Nonetheless, it is not an ideal solution,
> > as you can only iterate forward on the iterator returned by fetch(),
> > whereas the use case calls for a reverse iterator, as we are only
> > interested in the latest value for an entry.
> >
> > I am curious as to the appetite for a KIP to add such a TTL caching
> store.
> > KAFKA-4212 is the issue I opened requesting such a store.  Do others
> have a
> > need for them?  If there is interest in such a KIP, I can get one
> started.
> >
> > If there is interest, there are two ways such a store could be
> > implemented.  It could make use of RocksDB TTL feature or it could mirror
> > WindowStore and make use multiple segmented RockDBs, possibly reusing the
> > RocksDBSegmentedBytesStore from the latest refactoring of the stores.
> The
> > former deletes most of the work to RocksDB compaction, although likely at
> > the expense of greater write amplification.  The later is more efficient
> at
> > dropping expired entries, but potentially more space inefficient.
> >
> > Thoughts?
> >
>
>
>
> --
> -- Guozhang
>


Build failed in Jenkins: kafka-trunk-jdk7 #1921

2017-02-08 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4741; Fix potential buffer leak in RecordAccumulator in case of

--
[...truncated 19627 lines...]
org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddMoreThanOnePatternSourceNode PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithSameName PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldSetCorrectSourceNodesWithRegexUpdatedTopics PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSourceWithSameTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testTopicGroupsByStateStore PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourceWithoutOffsetReset STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourceWithoutOffsetReset PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithDuplicates PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testPatternSourceTopic 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testPatternMatchesAlreadyProvidedTopicSource PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithMultipleParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testSubscribeTopicNameAndPattern PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddPatternSourceWithoutOffsetReset STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddPatternSourceWithoutOffsetReset PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAddNullInternalTopic PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddProcessorWithWrongParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingProcessor PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourcePatternWithOffsetReset STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddSourcePatternWithOffsetReset PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > testAddStateStore 
PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddSinkConnectedWithParent PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithNonExistingProcessor STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
testAddStateStoreWithNonExistingProcessor PASSED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics STARTED

org.apache.kafka.streams.processor.TopologyBuilderTest > 
shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics PASSED


[jira] [Updated] (KAFKA-4462) Improved Kafka Client Compatibility Policy

2017-02-08 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-4462:
---
Fix Version/s: 0.10.2.0

> Improved Kafka Client Compatibility Policy
> --
>
> Key: KAFKA-4462
> URL: https://issues.apache.org/jira/browse/KAFKA-4462
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.10.1.1
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>  Labels: kip
> Fix For: 0.10.2.0
>
>
> A proposal to improve the compatibility policy of the Kafka client by 
> supporting the combination of new client, old broker.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4506) Refactor AbstractRequest to contain version information

2017-02-08 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-4506:
---
Fix Version/s: 0.10.2.0

> Refactor AbstractRequest to contain version information
> ---
>
> Key: KAFKA-4506
> URL: https://issues.apache.org/jira/browse/KAFKA-4506
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 0.10.2.0
>
>
> Refactor AbstractRequest to contain version information.  Remove some client 
> code which implicitly assumes that the latest version of each message will 
> always be processed and used.  Always match the API version in request 
> headers to the request version.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2017-02-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-4740:
--

Assignee: Sébastien Launay

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Assignee: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  

Re: Streams: TTLCacheStore

2017-02-08 Thread Guozhang Wang
Hello Elias,

I would love to solicit more feedbacks from the community on how commonly
used a TTL persistent KV store. Maybe you can share your use cases first
here in this thread?

As for its implementation, I think leveraging rocksdb's TTL feature would
be a good option. One tricky part though, is how we can insert the
corresponding "tombstone" messages to the changelogs as well in an
efficient way (this is also the main reason we did not add this feature in
the first release of Kafka Streams). I remember rocksdb's TTL feature do
have a compaction listener interface, but not sure if it is available in
JNI. That may worth exploring.


Guozhang


On Mon, Feb 6, 2017 at 8:17 PM, Elias Levy 
wrote:

> We have a use case within a Streams application that requires a persistent
> TTL key-value cache store.  As Streams does not currently offer such a
> store, I've implemented it by abusing WindowStore, as it allows for a
> configurable retention period.  Nonetheless, it is not an ideal solution,
> as you can only iterate forward on the iterator returned by fetch(),
> whereas the use case calls for a reverse iterator, as we are only
> interested in the latest value for an entry.
>
> I am curious as to the appetite for a KIP to add such a TTL caching store.
> KAFKA-4212 is the issue I opened requesting such a store.  Do others have a
> need for them?  If there is interest in such a KIP, I can get one started.
>
> If there is interest, there are two ways such a store could be
> implemented.  It could make use of RocksDB TTL feature or it could mirror
> WindowStore and make use multiple segmented RockDBs, possibly reusing the
> RocksDBSegmentedBytesStore from the latest refactoring of the stores.  The
> former deletes most of the work to RocksDB compaction, although likely at
> the expense of greater write amplification.  The later is more efficient at
> dropping expired entries, but potentially more space inefficient.
>
> Thoughts?
>



-- 
-- Guozhang


Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-08 Thread Roger Hoover
Thanks.  I found the other discussion thread.  Sorry for being behind on
this.

I'm interested in the future impersonation use cases.  This seems to get us
closer.

+1 (non-binding)

On Wed, Feb 8, 2017 at 4:41 AM, Manikumar  wrote:

> Hi Roger,
>
> In the current proposal, we only allow a user to get delegation token for
> that user only.
> Anyone who gets that token can impersonate the user on the broker.
>
> Yes, In future we can extend the support to allow a user to acquire
> delegation tokens for
> other users.
>
> Pl refer discuss mail thread for impersonation related discussion.
>
> Thanks,
> Manikumar
>
> On Wed, Feb 8, 2017 at 8:37 AM, Roger Hoover 
> wrote:
>
> > Hi Jun,
> >
> > How does it allow impersonation at the connection level?  Looking at the
> > KIP, the DelegationTokenRequest does not have an "Owner" field that can
> be
> > set.   The owner field of the DelegationTokenResponse says it's the
> "Kakfa
> > Principal which requested the delegation token".  For impersonation,
> don't
> > we need to be able to get tokens for other users besides the one making
> the
> > request?
> >
> > Thanks,
> >
> > Roger
> >
> > On Tue, Feb 7, 2017 at 6:45 PM, Jun Rao  wrote:
> >
> > > Hi, Roger,
> > >
> > > Just to clarify. This KIP already allows you to do impersonation at the
> > > connection level. Are you talking about impersonation at the request
> > level?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Feb 7, 2017 at 5:53 PM, Roger Hoover 
> > > wrote:
> > >
> > > > Just wondering...how difficult would be it be to later add
> > impersonation
> > > (
> > > > https://issues.apache.org/jira/browse/KAFKA-3712)?  One use case
> would
> > > be
> > > > a
> > > > Kafka admin UI that would take action on the cluster on behalf
> > different
> > > > users.I suppose we could later add an "effectiveUserId" (in Unix
> > > > terminology) to the token details?
> > > >
> > > > On Tue, Feb 7, 2017 at 5:25 PM, Grant Henke 
> > wrote:
> > > >
> > > > > +1 from me as well.
> > > > >
> > > > > On Tue, Feb 7, 2017 at 7:10 PM, Jason Gustafson <
> ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Looks like a great proposal! I noticed that key rotation is not
> > > > included.
> > > > > > That may be reasonable for the initial work, but it might be nice
> > to
> > > > > share
> > > > > > some thoughts on how that might work in the future. For example,
> I
> > > > could
> > > > > > imagine delegation.token.master.key could be a list, which would
> > > allow
> > > > > > users to support both a new and old key at the same time while
> > > clients
> > > > > are
> > > > > > upgrading keys.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > > On Tue, Feb 7, 2017 at 4:42 PM, Gwen Shapira 
> > > > wrote:
> > > > > >
> > > > > > > Read the KIP again and I think it looks good.
> > > > > > >
> > > > > > > +1 from me.
> > > > > > >
> > > > > > > On Tue, Feb 7, 2017 at 3:05 PM, Jun Rao 
> > wrote:
> > > > > > > > Hi, Mani,
> > > > > > > >
> > > > > > > > If a token expires, then every broker will potentially try to
> > > > delete
> > > > > it
> > > > > > > > around the same time, but only one will succeed. So, we will
> > have
> > > > to
> > > > > > deal
> > > > > > > > with failures in that case? Another way is to let just one
> > broker
> > > > > (say,
> > > > > > > the
> > > > > > > > controller) deletes expired tokens.
> > > > > > > >
> > > > > > > > It would also be helpful for others to give feedback on this
> > KIP.
> > > > > > Rajini,
> > > > > > > > Gwen, Ismael?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sun, Feb 5, 2017 at 9:54 AM, Manikumar <
> > > > manikumar.re...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi Jun,
> > > > > > > >>
> > > > > > > >>  Please see the replies inline.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> > >
> > > > > > > >> > > Only one broker does the deletion. Broker updates the
> > > > expiration
> > > > > > in
> > > > > > > its
> > > > > > > >> > > local cache
> > > > > > > >> > > and on zookeeper so other brokers also get notified and
> > > their
> > > > > > cache
> > > > > > > >> > > statuses are updated as well.
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > Which broker does the deletion?
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >> Any broker can handle the create/expire/renew/describe
> > > > > delegationtoken
> > > > > > > >> requests.
> > > > > > > >> changes are propagated through zk notifications.  Every
> broker
> > > is
> > > > > > > >> responsible for
> > > > > > > >> expiring the tokens. This check be can done during request
> > > > handling
> > > > > > time
> > > > > > > >> and/or
> > > > > > > >> during token authentication time.
> > > > > > > >>
> > > > > > > >>
> > 

Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-08 Thread Manikumar
Hi Jason,

As noticed by you, the current proposal does not support rotation of secret.
We also discussed about maintaining a list of secret keys. Other option
could
be using the controller to generate and rotate secret and distribute it to
all brokers.
I will update the possible alternatives to future work section of KIP.



On Wed, Feb 8, 2017 at 6:40 AM, Jason Gustafson  wrote:

> Looks like a great proposal! I noticed that key rotation is not included.
> That may be reasonable for the initial work, but it might be nice to share
> some thoughts on how that might work in the future. For example, I could
> imagine delegation.token.master.key could be a list, which would allow
> users to support both a new and old key at the same time while clients are
> upgrading keys.
>
> -Jason
>
> On Tue, Feb 7, 2017 at 4:42 PM, Gwen Shapira  wrote:
>
> > Read the KIP again and I think it looks good.
> >
> > +1 from me.
> >
> > On Tue, Feb 7, 2017 at 3:05 PM, Jun Rao  wrote:
> > > Hi, Mani,
> > >
> > > If a token expires, then every broker will potentially try to delete it
> > > around the same time, but only one will succeed. So, we will have to
> deal
> > > with failures in that case? Another way is to let just one broker (say,
> > the
> > > controller) deletes expired tokens.
> > >
> > > It would also be helpful for others to give feedback on this KIP.
> Rajini,
> > > Gwen, Ismael?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Sun, Feb 5, 2017 at 9:54 AM, Manikumar 
> > wrote:
> > >
> > >> Hi Jun,
> > >>
> > >>  Please see the replies inline.
> > >>
> > >>
> > >> > >
> > >> > > Only one broker does the deletion. Broker updates the expiration
> in
> > its
> > >> > > local cache
> > >> > > and on zookeeper so other brokers also get notified and their
> cache
> > >> > > statuses are updated as well.
> > >> > >
> > >> > >
> > >> > Which broker does the deletion?
> > >> >
> > >>
> > >> Any broker can handle the create/expire/renew/describe delegationtoken
> > >> requests.
> > >> changes are propagated through zk notifications.  Every broker is
> > >> responsible for
> > >> expiring the tokens. This check be can done during request handling
> time
> > >> and/or
> > >> during token authentication time.
> > >>
> > >>
> > >> >
> > >> >
> > >> > 110. The diagrams in the wiki still show MD5 digest. Could you
> change
> > it
> > >> to
> > >> > SCRAM?
> > >> >
> > >> >
> > >>   Updated the diagram.
> > >>
> > >>
> > >>
> > >> Thanks,
> > >> Manikumar
> > >>
> > >>
> > >>
> > >>
> > >> >
> > >> >
> > >> > >
> > >> > > Thanks.
> > >> > > Manikumar
> > >> > >
> > >> > >
> > >> > > >
> > >> > > > On Fri, Dec 23, 2016 at 9:26 AM, Manikumar <
> > >> manikumar.re...@gmail.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Hi,
> > >> > > > >
> > >> > > > > I would like to initiate the vote on KIP-48:
> > >> > > > >
> > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+
> > >> > > > > Delegation+token+support+for+Kafka
> > >> > > > >
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Manikumar
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> >
> >
> >
> > --
> > Gwen Shapira
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter | blog
> >
>


Build failed in Jenkins: kafka-trunk-jdk8 #1258

2017-02-08 Thread Apache Jenkins Server
See 

Changes:

[ismael] KAFKA-4741; Fix potential buffer leak in RecordAccumulator in case of

--
[...truncated 33580 lines...]
kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithCollision PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokerListWithNoTopics PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testGetAllTopicMetadata 
PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata 
STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.SaslPlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown PASSED

kafka.integration.PrimitiveApiTest > testMultiProduce STARTED

kafka.integration.PrimitiveApiTest > testMultiProduce PASSED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch STARTED

kafka.integration.PrimitiveApiTest > testDefaultEncoderProducerAndFetch PASSED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize 
STARTED

kafka.integration.PrimitiveApiTest > testFetchRequestCanProperlySerialize PASSED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests STARTED

kafka.integration.PrimitiveApiTest > testPipelinedProduceRequests PASSED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch STARTED

kafka.integration.PrimitiveApiTest > testProduceAndMultiFetch PASSED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression STARTED

kafka.integration.PrimitiveApiTest > 
testDefaultEncoderProducerAndFetchWithCompression PASSED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic STARTED

kafka.integration.PrimitiveApiTest > testConsumerEmptyTopic PASSED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest STARTED

kafka.integration.PrimitiveApiTest > testEmptyFetchRequest PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testIsrAfterBrokerShutDownAndJoinsBack PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopicWithCollision 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
STARTED

kafka.integration.PlaintextTopicMetadataTest > testAliveBrokerListWithNoTopics 
PASSED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic STARTED

kafka.integration.PlaintextTopicMetadataTest > testAutoCreateTopic PASSED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testGetAllTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterNewBrokerStartup PASSED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata STARTED

kafka.integration.PlaintextTopicMetadataTest > testBasicTopicMetadata PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication STARTED

kafka.integration.PlaintextTopicMetadataTest > 
testAutoCreateTopicWithInvalidReplication PASSED

kafka.integration.PlaintextTopicMetadataTest > 
testAliveBrokersListWithNoTopicsAfterABrokerShutdown STARTED

kafka.integration.PlaintextTopicMetadataTest > 

[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2017-02-08 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858165#comment-15858165
 ] 

Sébastien Launay commented on KAFKA-4740:
-

[~ijuma] Sure, I can take a peek at it.
I'll submit a PR against the Github repo to review changes once I have 
something ready.

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: 

[GitHub] kafka pull request #2517: HOTFIX: renamed test so it is picked up by ducktap...

2017-02-08 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2517

HOTFIX: renamed test so it is picked up by ducktape



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka hotfix-broker-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2517


commit d5eb7f10c80fdfbce31f679829b830e2e70ff977
Author: Eno Thereska 
Date:   2017-02-08T15:54:41Z

renamed test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-08 Thread Ismael Juma
Thanks for elaborating Jay. I totally agree that we have to be very careful
in how we use our complexity budget. Easier said than done when people
don't agree on what is complex and what is simple. :) For example, I think
batch APIs are a significant source of complexity as you have to do a bunch
of ceremony to group things before sending the request and error handling
becomes more complex due to partial failures (things like `Try` or other
mechanisms that serve a similar role are then needed).

Maybe a way forward is to write API usage examples to help validate that
the suggested API is indeed easy to use.

Ismael

On Wed, Feb 8, 2017 at 4:40 AM, Jay Kreps  wrote:

> Totally agree on CompletableFuture. Also agree with some of the rough edges
> on the Consumer.
>
> I don't have much of a leg to stand on with the splitting vs not splitting
> thing, really hard to argue one or the other is better. I guess the one
> observation in watching us try to make good public apis over the years is I
> am kind of in favor of a particular kind of simple. In particular I think
> since the bar is sooo high in support and docs and the community of users
> so broad in the range of their capabilities, it makes it so there is a lot
> of value in dead simple interfaces that don't have a lot of conceptual
> weight, don't introduce a lot of new classes or concepts or general
> patterns that must be understood to use them correctly. So things like
> nesting, or the Try class, or async apis, or even just a complex set of
> classes representing arguments or return values kind of all stack
> conceptual burdens on the user to figure out correct usage. So like, for
> example, the Try class is very elegant and represents a whole generalized
> class of possibly completed actions, but the flip side is maybe I'm just a
> working guy who needs to list his kafka topics but doesn't know Rust, take
> pity on me! :-)
>
> Nit picking aside, super excited to see us progress on this.
>
> -Jay
>
> On Tue, Feb 7, 2017 at 3:46 PM Ismael Juma  wrote:
>
> > Hi Jay,
> >
> > Thanks for the feedback. Comments inline.
> >
> > On Tue, Feb 7, 2017 at 8:18 PM, Jay Kreps  wrote:
> > >
> > >- I think it would be good to not use "get" as the prefix for things
> > >making remote calls. We've tried to avoid the java getter convention
> > >entirely (see code style guide), but for remote calls in particular
> it
> > > kind
> > >of blurs the line between field access and remote RPC in a way that
> > > leads
> > >people to trouble. What about, e.g., fetchAllGroups() vs
> > getAllGroups().
> > >
> >
> > Agreed that we should avoid the `get` prefix for remote calls. There are
> a
> > few possible prefixes for the read operations: list, fetch, describe.
> >
> >
> > >- I think futures and callbacks are a bit of a pain to use. I'd
> second
> > >Becket's comment: let's ensure there a common use case motivating
> > these
> > >that wouldn't be just as easily satisfied with batch operations
> (which
> > > we
> > >seem to have at least for some things). In terms of flexibility
> > > Callbacks >
> > >Futures > Batch Ops but I think in terms of usability it is the
> exact
> > >opposite so let's make sure we have worked out how the API will be
> > used
> > >before deciding. In particular I think java Futures are often an
> > >uncomfortable half-way point since calling get() and blocking the
> > > thread is
> > >often not what you want for chaining sequences of operations in a
> > truly
> > >async way, so 99% of people just use the future as a way to batch
> > calls.
> > >
> >
> > We should definitely figure out how the APIs are going to be used before
> > deciding. I agree that callbacks are definitely painful and there's
> little
> > reason to expose them in a modern API unless it's meant to be very low
> > level. When it comes to Futures, I think it's important to distinguish
> what
> > is available in Java 7 and below versus what is available from Java 8
> > onwards. CompletableFuture makes it much easier to compose/chain
> operations
> > (in a similar vein to java.util.Stream, our own Streams API, etc.) and it
> > gives you the ability to register callbacks if you really want to
> (avoiding
> > the somewhat odd situation in the Producer where we return a Future _and_
> > allow you to pass a callback).
> >
> >
> > >- Personally I don't think splitting the admin methods up actually
> > makes
> > >things more usable. It just makes you have to dig through our
> > > hierarchy. I
> > >think a flat class with a bunch of operations (like the consumer
> api)
> > is
> > >probably the easiest for people to grok and find things on. I am
> kind
> > > of a
> > >dumb PHP programmer at heart, though.
> > >
> >
> > I am not sure it's fair to compare the AdminClient with the Consumer. The
> > former has APIs for a bunch of unrelated APIs (topics, ACLs, 

[jira] [Created] (KAFKA-4744) Streams_bounce test failing occassionally

2017-02-08 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-4744:
---

 Summary: Streams_bounce test failing occassionally
 Key: KAFKA-4744
 URL: https://issues.apache.org/jira/browse/KAFKA-4744
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Eno Thereska
 Fix For: 0.10.3.0


The test occasionally fails, e.g., in 
https://jenkins.confluent.io/job/system-test-kafka/499/console.

The message is:
kafkatest.tests.streams.streams_bounce_test.StreamsBounceTest.test_bounce: 
FAIL: Streams Test process on ubuntu@worker5 took too long to exit
Traceback (most recent call last):
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 123, in run
data = self.run_test()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/tests/runner_client.py",
 line 176, in run_test
return self.test_context.function(self.test)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/tests/streams/streams_bounce_test.py",
 line 72, in test_bounce
self.processor1.stop()
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/services/service.py",
 line 255, in stop
self.stop_node(node)
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/tests/kafkatest/services/streams.py",
 line 77, in stop_node
wait_until(lambda: not node.account.alive(pid), timeout_sec=60, 
err_msg="Streams Test process on " + str(node.account) + " took too long to 
exit")
  File 
"/var/lib/jenkins/workspace/system-test-kafka/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.6.0-py2.7.egg/ducktape/utils/util.py",
 line 36, in wait_until
raise TimeoutError(err_msg)
TimeoutError: Streams Test process on ubuntu@worker5 took too long to exit


Looking at the logs it looks like the test succeeded, so it might be that we 
need to slightly increase the time we wait for.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-4741) Memory leak in RecordAccumulator.append

2017-02-08 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-4741:
--

Assignee: Satish Duggana

Done.

> Memory leak in RecordAccumulator.append
> ---
>
> Key: KAFKA-4741
> URL: https://issues.apache.org/jira/browse/KAFKA-4741
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Satish Duggana
>Assignee: Satish Duggana
> Fix For: 0.10.3.0
>
>
> RecordAccumulator creates a `ByteBuffer` from free memory pool. This should 
> be deallocated when invocations encounter an exception or throwing any 
> exceptions. 
> I added todo comment lines in the below code for cases to deallocate that 
> buffer.
> {code:title=RecordProducer.java|borderStyle=solid}
> ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
> synchronized (dq) {
> // Need to check if producer is closed again after grabbing 
> the dequeue lock.
> if (closed)
>// todo buffer should be cleared.
> throw new IllegalStateException("Cannot send after the 
> producer is closed.");
> // todo buffer should be cleared up when tryAppend throws an 
> Exception
> RecordAppendResult appendResult = tryAppend(timestamp, key, 
> value, callback, dq);
> if (appendResult != null) {
> // Somebody else found us a batch, return the one we 
> waited for! Hopefully this doesn't happen often...
> free.deallocate(buffer);
> return appendResult;
> }
> {code}
> I will raise PR for the same soon.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4741) Memory leak in RecordAccumulator.append

2017-02-08 Thread Satish Duggana (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858059#comment-15858059
 ] 

Satish Duggana commented on KAFKA-4741:
---

[~ijuma] Thanks for merging the PR. Can you set me as assignee for this JIRA? 
It did not allow me to do that.

> Memory leak in RecordAccumulator.append
> ---
>
> Key: KAFKA-4741
> URL: https://issues.apache.org/jira/browse/KAFKA-4741
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Satish Duggana
> Fix For: 0.10.3.0
>
>
> RecordAccumulator creates a `ByteBuffer` from free memory pool. This should 
> be deallocated when invocations encounter an exception or throwing any 
> exceptions. 
> I added todo comment lines in the below code for cases to deallocate that 
> buffer.
> {code:title=RecordProducer.java|borderStyle=solid}
> ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
> synchronized (dq) {
> // Need to check if producer is closed again after grabbing 
> the dequeue lock.
> if (closed)
>// todo buffer should be cleared.
> throw new IllegalStateException("Cannot send after the 
> producer is closed.");
> // todo buffer should be cleared up when tryAppend throws an 
> Exception
> RecordAppendResult appendResult = tryAppend(timestamp, key, 
> value, callback, dq);
> if (appendResult != null) {
> // Somebody else found us a batch, return the one we 
> waited for! Hopefully this doesn't happen often...
> free.deallocate(buffer);
> return appendResult;
> }
> {code}
> I will raise PR for the same soon.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


FINAL REMINDER: CFP for ApacheCon closes February 11th

2017-02-08 Thread Rich Bowen
Dear Apache Enthusiast,

This is your FINAL reminder that the Call for Papers (CFP) for ApacheCon
Miami is closing this weekend - February 11th. This is your final
opportunity to submit a talk for consideration at this event.

This year, we are running several mini conferences in conjunction with
the main event, so if you're submitting for one of those events, please
pay attention to the instructions below.

Apache: Big Data
* Event information:
http://events.linuxfoundation.org/events/apache-big-data-north-america
* CFP:
http://events.linuxfoundation.org/events/apache-big-data-north-america/program/cfp

Apache: IoT (Internet of Things)
* Event Information: http://us.apacheiot.org/
* CFP -
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
(Indicate 'IoT' in the Target Audience field)

CloudStack Collaboration Conference
* Event information: http://us.cloudstackcollab.org/
* CFP -
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
(Indicate 'CloudStack' in the Target Audience field)

FlexJS Summit
* Event information - http://us.apacheflexjs.org/
* CFP -
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
(Indicate 'Flex' in the Target Audience field)

TomcatCon
* Event information - https://tomcat.apache.org/conference.html
* CFP -
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp
(Indicate 'Tomcat' in the Target Audience field)

All other topics and projects
* Event information -
http://events.linuxfoundation.org/events/apachecon-north-america/program/about
* CFP -
http://events.linuxfoundation.org/events/apachecon-north-america/program/cfp

Admission to any of these events also grants you access to all of the
others.

Thanks, and we look forward to seeing you in Miami!

-- 
Rich Bowen
VP Conferences, Apache Software Foundation
rbo...@apache.org
Twitter: @apachecon



(You are receiving this email because you are subscribed to a dev@ or
users@ list of some Apache Software Foundation project. If you do not
wish to receive email from these lists any more, you must follow that
list's unsubscription procedure. View the headers of this message for
unsubscription instructions.)


[jira] [Commented] (KAFKA-4740) Using new consumer API with a Deserializer that throws SerializationException can lead to infinite loop

2017-02-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15858013#comment-15858013
 ] 

Ismael Juma commented on KAFKA-4740:


This is a tricky one. It may be worth prototyping the option where we return 
all the data up to the record that failed deserialization and throw the 
exception in the next poll. Maybe the implementation is not so bad. [~yabon], 
would you be willing to do that?

> Using new consumer API with a Deserializer that throws SerializationException 
> can lead to infinite loop
> ---
>
> Key: KAFKA-4740
> URL: https://issues.apache.org/jira/browse/KAFKA-4740
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
> Environment: Kafka broker 0.10.1.1 (but this bug is not dependent on 
> the broker version)
> Kafka clients 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Sébastien Launay
>Priority: Critical
>
> The old consumer supports deserializing records into typed objects and throws 
> a {{SerializationException}} through {{MessageAndMetadata#key()}} and 
> {{MessageAndMetadata#message()}} that can be catched by the client \[1\].
> When using the new consumer API with kafka-clients version < 0.10.0.1, such 
> the exception is swallowed by the {{NetworkClient}} class and result in an 
> infinite loop which the client has no control over like:
> {noformat}
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset 
> for partition test2-0 to earliest offset.
> DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 
> for partition test2-0
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ERROR org.apache.kafka.clients.NetworkClient - Uncaught error in request 
> completion:
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by IntegerDeserializer is not 4
> ...
> {noformat}
> Thanks to KAFKA-3977, this has been partially fixed in 0.10.1.0 but another 
> issue still remains.
> Indeed, the client can now catch the {{SerializationException}} but the next 
> call to {{Consumer#poll(long)}} will throw the same exception indefinitely.
> The following snippet (full example available on Github \[2\] for most 
> released kafka-clients versions):
> {code:java}
> try (KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(consumerConfig, new StringDeserializer(), new 
> IntegerDeserializer())) {
> kafkaConsumer.subscribe(Arrays.asList("topic"));
> // Will run till the shutdown hook is called
> while (!doStop) {
> try {
> ConsumerRecords records = 
> kafkaConsumer.poll(1000);
> if (!records.isEmpty()) {
> logger.info("Got {} messages", records.count());
> for (ConsumerRecord record : records) {
> logger.info("Message with partition: {}, offset: {}, key: 
> {}, value: {}",
> record.partition(), record.offset(), record.key(), 
> record.value());
> }
> } else {
> logger.info("No messages to consume");
> }
> } catch (SerializationException e) {
> logger.warn("Failed polling some records", e);
> }
>  }
> }
> {code}
> when run with the following records (third record has an invalid Integer 
> value):
> {noformat}
> printf "\x00\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x01\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> printf "\x00\x00\x00\x02\n" | bin/kafka-console-producer.sh --broker-list 
> localhost:9092 --topic topic
> {noformat}
> will produce the following logs:
> {noformat}
> INFO  consumer.Consumer - Got 2 messages
> INFO  consumer.Consumer - Message with partition: 0, offset: 0, key: null, 
> value: 0
> INFO  consumer.Consumer - Message with partition: 0, offset: 1, key: null, 
> value: 1
> WARN  consumer.Consumer - Failed polling some records
> org.apache.kafka.common.errors.SerializationException: Error deserializing 
> key/value for partition topic-0 at offset 2
> Caused by: org.apache.kafka.common.errors.SerializationException: Size of 
> data received by IntegerDeserializer is not 4
> WARN  consumer.Consumer - Failed polling some records
> 

[jira] [Commented] (KAFKA-4741) Memory leak in RecordAccumulator.append

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15857996#comment-15857996
 ] 

ASF GitHub Bot commented on KAFKA-4741:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2509


> Memory leak in RecordAccumulator.append
> ---
>
> Key: KAFKA-4741
> URL: https://issues.apache.org/jira/browse/KAFKA-4741
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Satish Duggana
> Fix For: 0.10.3.0
>
>
> RecordAccumulator creates a `ByteBuffer` from free memory pool. This should 
> be deallocated when invocations encounter an exception or throwing any 
> exceptions. 
> I added todo comment lines in the below code for cases to deallocate that 
> buffer.
> {code:title=RecordProducer.java|borderStyle=solid}
> ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
> synchronized (dq) {
> // Need to check if producer is closed again after grabbing 
> the dequeue lock.
> if (closed)
>// todo buffer should be cleared.
> throw new IllegalStateException("Cannot send after the 
> producer is closed.");
> // todo buffer should be cleared up when tryAppend throws an 
> Exception
> RecordAppendResult appendResult = tryAppend(timestamp, key, 
> value, callback, dq);
> if (appendResult != null) {
> // Somebody else found us a batch, return the one we 
> waited for! Hopefully this doesn't happen often...
> free.deallocate(buffer);
> return appendResult;
> }
> {code}
> I will raise PR for the same soon.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2509: KAFKA-4741 Fix for buffer leaks in RecordAccumulat...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2509


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4741) Memory leak in RecordAccumulator.append

2017-02-08 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-4741.

   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2509
[https://github.com/apache/kafka/pull/2509]

> Memory leak in RecordAccumulator.append
> ---
>
> Key: KAFKA-4741
> URL: https://issues.apache.org/jira/browse/KAFKA-4741
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Satish Duggana
> Fix For: 0.10.3.0
>
>
> RecordAccumulator creates a `ByteBuffer` from free memory pool. This should 
> be deallocated when invocations encounter an exception or throwing any 
> exceptions. 
> I added todo comment lines in the below code for cases to deallocate that 
> buffer.
> {code:title=RecordProducer.java|borderStyle=solid}
> ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
> synchronized (dq) {
> // Need to check if producer is closed again after grabbing 
> the dequeue lock.
> if (closed)
>// todo buffer should be cleared.
> throw new IllegalStateException("Cannot send after the 
> producer is closed.");
> // todo buffer should be cleared up when tryAppend throws an 
> Exception
> RecordAppendResult appendResult = tryAppend(timestamp, key, 
> value, callback, dq);
> if (appendResult != null) {
> // Somebody else found us a batch, return the one we 
> waited for! Hopefully this doesn't happen often...
> free.deallocate(buffer);
> return appendResult;
> }
> {code}
> I will raise PR for the same soon.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-08 Thread Manikumar
Hi Roger,

In the current proposal, we only allow a user to get delegation token for
that user only.
Anyone who gets that token can impersonate the user on the broker.

Yes, In future we can extend the support to allow a user to acquire
delegation tokens for
other users.

Pl refer discuss mail thread for impersonation related discussion.

Thanks,
Manikumar

On Wed, Feb 8, 2017 at 8:37 AM, Roger Hoover  wrote:

> Hi Jun,
>
> How does it allow impersonation at the connection level?  Looking at the
> KIP, the DelegationTokenRequest does not have an "Owner" field that can be
> set.   The owner field of the DelegationTokenResponse says it's the "Kakfa
> Principal which requested the delegation token".  For impersonation, don't
> we need to be able to get tokens for other users besides the one making the
> request?
>
> Thanks,
>
> Roger
>
> On Tue, Feb 7, 2017 at 6:45 PM, Jun Rao  wrote:
>
> > Hi, Roger,
> >
> > Just to clarify. This KIP already allows you to do impersonation at the
> > connection level. Are you talking about impersonation at the request
> level?
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Feb 7, 2017 at 5:53 PM, Roger Hoover 
> > wrote:
> >
> > > Just wondering...how difficult would be it be to later add
> impersonation
> > (
> > > https://issues.apache.org/jira/browse/KAFKA-3712)?  One use case would
> > be
> > > a
> > > Kafka admin UI that would take action on the cluster on behalf
> different
> > > users.I suppose we could later add an "effectiveUserId" (in Unix
> > > terminology) to the token details?
> > >
> > > On Tue, Feb 7, 2017 at 5:25 PM, Grant Henke 
> wrote:
> > >
> > > > +1 from me as well.
> > > >
> > > > On Tue, Feb 7, 2017 at 7:10 PM, Jason Gustafson 
> > > > wrote:
> > > >
> > > > > Looks like a great proposal! I noticed that key rotation is not
> > > included.
> > > > > That may be reasonable for the initial work, but it might be nice
> to
> > > > share
> > > > > some thoughts on how that might work in the future. For example, I
> > > could
> > > > > imagine delegation.token.master.key could be a list, which would
> > allow
> > > > > users to support both a new and old key at the same time while
> > clients
> > > > are
> > > > > upgrading keys.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Tue, Feb 7, 2017 at 4:42 PM, Gwen Shapira 
> > > wrote:
> > > > >
> > > > > > Read the KIP again and I think it looks good.
> > > > > >
> > > > > > +1 from me.
> > > > > >
> > > > > > On Tue, Feb 7, 2017 at 3:05 PM, Jun Rao 
> wrote:
> > > > > > > Hi, Mani,
> > > > > > >
> > > > > > > If a token expires, then every broker will potentially try to
> > > delete
> > > > it
> > > > > > > around the same time, but only one will succeed. So, we will
> have
> > > to
> > > > > deal
> > > > > > > with failures in that case? Another way is to let just one
> broker
> > > > (say,
> > > > > > the
> > > > > > > controller) deletes expired tokens.
> > > > > > >
> > > > > > > It would also be helpful for others to give feedback on this
> KIP.
> > > > > Rajini,
> > > > > > > Gwen, Ismael?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Sun, Feb 5, 2017 at 9:54 AM, Manikumar <
> > > manikumar.re...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Jun,
> > > > > > >>
> > > > > > >>  Please see the replies inline.
> > > > > > >>
> > > > > > >>
> > > > > > >> > >
> > > > > > >> > > Only one broker does the deletion. Broker updates the
> > > expiration
> > > > > in
> > > > > > its
> > > > > > >> > > local cache
> > > > > > >> > > and on zookeeper so other brokers also get notified and
> > their
> > > > > cache
> > > > > > >> > > statuses are updated as well.
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > Which broker does the deletion?
> > > > > > >> >
> > > > > > >>
> > > > > > >> Any broker can handle the create/expire/renew/describe
> > > > delegationtoken
> > > > > > >> requests.
> > > > > > >> changes are propagated through zk notifications.  Every broker
> > is
> > > > > > >> responsible for
> > > > > > >> expiring the tokens. This check be can done during request
> > > handling
> > > > > time
> > > > > > >> and/or
> > > > > > >> during token authentication time.
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > 110. The diagrams in the wiki still show MD5 digest. Could
> you
> > > > > change
> > > > > > it
> > > > > > >> to
> > > > > > >> > SCRAM?
> > > > > > >> >
> > > > > > >> >
> > > > > > >>   Updated the diagram.
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Manikumar
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > >
> > > > > > >> > > Thanks.
> > > > > > >> > > Manikumar
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > 

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-08 Thread Jorge Esteban Quilcate Otoya
Thanks for the feedback!

@Onur, @Gwen:

Agree. Actually at the first draft I considered to have it inside
´kafka-consumer-groups.sh´, but I decide to propose it as a standalone tool
to describe it clearly and focus it on reset functionality.

But now that you mentioned, it does make sense to have it in
´kafka-consumer-groups.sh´. How would be a consistent way to introduce it?

Maybe something like this:

´kafka-consumer-groups.sh --reset-offset --generate --group cg1 --topics t1
--reset-from 2017-01-01T00:00:00.000 --output plan.json´

´kafka-consumer-groups.sh --reset-offset --verify --reset-json-file
plan.json´

´kafka-consumer-groups.sh --reset-offset --execute --reset-json-file
plan.json´

´kafka-consumer-groups.sh --reset-offset --generate-and-execute --group cg1
--topics t1 --reset-from 2017-01-01T00:00:00.000´

@Gwen:

> It looks exactly like the replica assignment tool

It was influenced by ;-) I use the generate-verify-execute process here to
make sure user will be aware of the result of this operation. At the
beginning we considered only add a couple of options to Consumer Group
Command:

--rewind-to-timestamp and --rewind-to-period

@Onur:

> You can actually get away with overriding while members of the group are live
with method 2 by using group information from DescribeGroupsRequest.

This means that we need to have Consumer Group stopped before executing and
start a new consumer internally to do this? Therefore, we won't be able to
consider executing reset when ConsumerGroup is active? (trying to relate it
with @Dong 5th question)

@Dong:

> Should we allow user to use wildcard to reset offset of all groups for a
given topic as well?

I haven't thought about this scenario. Could be interesting. Following the
recommendation to add it into Consumer Group Command, in this case Group
argument will be optional if there are only 1 topic. I think for multiple
topic won't be that useful.

> Should we allow user to specify timestamp per topic partition in the json
file as well?

Don't think this could be a valid from the tool, but if Reset Plan is
generated, and user want to set the offset for a specific partition to
other offset (eventually based on another timestamp), and execute it, it
will be up to her/him.

> Should the script take some credential file to make sure that this
operation is authenticated given the potential impact of this operation?

Haven't tried to secure brokers yet, but the tool should support
authorization if it's enabled in the broker.

> Should we provide constant to reset committed offset to earliest/latest
offset of a partition, e.g. -1 indicates earliest offset and -2 indicates
latest offset.

I will go for something like ´--reset-to-earliest´ and ´--reset-to-latest´

> Should we allow dynamic change of the comitted offset when consumer are
running, such that consumer will seek to the newly committed offset and
start consuming from there?

Not sure about this. I will recommend to keep it simple and ask user to
stop consumers first. But I would considered it if the trade-offs are
clear.

@Matthias

Added :). And thanks a lot for your help to define this KIP!



El mié., 8 feb. 2017 a las 7:47, Gwen Shapira ()
escribió:

> As long as the CLI is a bit consistent? Like, not just adding 3
> arguments and a JSON parser to the existing tool, right?
>
> On Tue, Feb 7, 2017 at 10:29 PM, Onur Karaman
>  wrote:
> > I think it makes sense to just add the feature to
> kafka-consumer-groups.sh
> >
> > On Tue, Feb 7, 2017 at 10:24 PM, Gwen Shapira  wrote:
> >
> >> Thanks for the KIP. I'm super happy about adding the capability.
> >>
> >> I hate the interface, though. It looks exactly like the replica
> >> assignment tool. A tool everyone loves so much that there are multiple
> >> projects, open and closed, that try to fix it.
> >>
> >> Can we swap it with something that looks a bit more like the consumer
> >> group tool? or the kafka streams reset tool? Consistency is helpful in
> >> such cases. I spent some time learning existing tools and learning yet
> >> another one is a deterrent.
> >>
> >> Gwen
> >>
> >>
> >>
> >> On Tue, Feb 7, 2017 at 6:43 PM, Jorge Esteban Quilcate Otoya
> >>  wrote:
> >> > Hi all,
> >> >
> >> > I would like to propose a KIP to Add a tool to Reset Consumer Group
> >> Offsets.
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 122%3A+Add+a+tool+to+Reset+Consumer+Group+Offsets
> >> >
> >> > Please, take a look at the proposal and share your feedback.
> >> >
> >> > Thanks,
> >> > Jorge.
> >>
> >>
> >>
> >> --
> >> Gwen Shapira
> >> Product Manager | Confluent
> >> 650.450.2760 <(650)%20450-2760> | @gwenshap
> >> Follow us: Twitter | blog
> >>
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 <(650)%20450-2760> | @gwenshap
> Follow us: Twitter | blog
>


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2017-02-08 Thread Ismael Juma
Hi Roger,

Sorry for the delay. SCRAM is specified by:

https://tools.ietf.org/html/rfc5802

The following quote is relevant:

A SCRAM mechanism name is a string "SCRAM-" followed by the
> uppercased name of the underlying hash function taken from the IANA
> "Hash Function Textual Names" registry (see http://www.iana.org),
> optionally followed by the suffix "-PLUS" (see below)


And:

"md2" 1.2.840.113549.2.2 [RFC3279]
> "md5" 1.2.840.113549.2.5 [RFC3279]
> "sha-1" 1.3.14.3.2.26 [RFC3279]
> "sha-224" 2.16.840.1.101.3.4.2.4 [RFC4055]
> "sha-256" 2.16.840.1.101.3.4.2.1 [RFC4055]
> "sha-384" 2.16.840.1.101.3.4.2.2 [RFC4055]
> "sha-512" 2.16.840.1.101.3.4.2.3 [RFC4055]


https://www.iana.org/assignments/hash-function-text-names/hash-function-text-names.xhtml

As you see, bcrypt is not an option for the current spec. The naming of the
mechanisms would be a bit misleading if support for bcrypt was added
(SCRAM-PKBDF2-SHA512, SCRAM-BCRYPT*, etc. would be better).

Does that make sense?

Ismael

On Tue, Jan 24, 2017 at 7:26 PM, Roger Hoover 
wrote:

> Thanks, Ismael.  Just curious, why does it not make sense to do bcrypt
> it in the context of SCRAM?
>
> On Mon, Jan 23, 2017 at 3:54 PM, Ismael Juma  wrote:
>
> > Hi Roger,
> >
> > SCRAM uses the PBKDF2 mechanism, here's a comparison between PBKDF2 and
> > bcrypt:
> >
> > http://security.stackexchange.com/questions/4781/do-any-secu
> > rity-experts-recommend-bcrypt-for-password-storage/6415#6415
> >
> > It may be worth supporting bcrypt, but not sure it would make sense to do
> > it in the context of SCRAM.
> >
> > A minor correction: the KIP includes SCRAM-SHA-256 and SCRAM-SHA-512 (not
> > SCRAM-SHA-1).
> >
> > Ismael
> >
> > On Mon, Jan 23, 2017 at 10:49 PM, Roger Hoover 
> > wrote:
> >
> > > Sorry for the late question but is there a reason to choose SHA-1 and
> > > SHA-256 instead of bcrypt?
> > >
> > > https://codahale.com/how-to-safely-store-a-password/
> > >
> > > On Fri, Nov 11, 2016 at 5:30 AM, Rajini Sivaram <
> > > rajinisiva...@googlemail.com> wrote:
> > >
> > > > I think all the comments and suggestions on this thread have now been
> > > > incorporated into the KIP. If there are no objections, I will start
> the
> > > > voting process on Monday.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > > On Tue, Nov 8, 2016 at 9:20 PM, Rajini Sivaram <
> > > > rajinisiva...@googlemail.com
> > > > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > Have added a sub-section on delegation token support to the KIP.
> > > > >
> > > > > Thank you,
> > > > >
> > > > > Rajini
> > > > >
> > > > > On Tue, Nov 8, 2016 at 8:07 PM, Jun Rao  wrote:
> > > > >
> > > > >> Hi, Rajini,
> > > > >>
> > > > >> That makes sense. Could you document this potential future
> extension
> > > in
> > > > >> the
> > > > >> KIP?
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >> On Tue, Nov 8, 2016 at 11:17 AM, Rajini Sivaram <
> > > > >> rajinisiva...@googlemail.com> wrote:
> > > > >>
> > > > >> > Jun,
> > > > >> >
> > > > >> > 11. SCRAM messages have an optional extensions field which is a
> > list
> > > > of
> > > > >> > key=value pairs. We can add an extension key to the first client
> > > > >> message to
> > > > >> > indicate delegation token. Broker can then obtain credentials
> and
> > > > >> principal
> > > > >> > using a different code path for delegation tokens.
> > > > >> >
> > > > >> > On Tue, Nov 8, 2016 at 6:38 PM, Jun Rao 
> wrote:
> > > > >> >
> > > > >> > > Magnus,
> > > > >> > >
> > > > >> > > Thanks for the input. If you don't feel strongly the need to
> > bump
> > > up
> > > > >> the
> > > > >> > > version of SaslHandshake, we can leave the version unchanged.
> > > > >> > >
> > > > >> > > Rajini,
> > > > >> > >
> > > > >> > > 11. Yes, we could send the HMAC as the SCRAM password for the
> > > > >> delegation
> > > > >> > > token. Do we need something to indicate that this SCRAM token
> is
> > > > >> special
> > > > >> > > (i.e., delegation token) so that we can generate the correct
> > > > >> > > KafkaPrincipal? The delegation token logic can be added
> later. I
> > > am
> > > > >> > asking
> > > > >> > > just so that we have enough in the design of SCRAM to add the
> > > > >> delegation
> > > > >> > > token logic later.
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > >
> > > > >> > > Jun
> > > > >> > >
> > > > >> > >
> > > > >> > > On Tue, Nov 8, 2016 at 4:42 AM, Rajini Sivaram <
> > > > >> > > rajinisiva...@googlemail.com
> > > > >> > > > wrote:
> > > > >> > >
> > > > >> > > > Hi Jun,
> > > > >> > > >
> > > > >> > > > 10. *s=* and *i=* come from the SCRAM
> > standard
> > > > >> (they
> > > > >> > > are
> > > > >> > > > transferred during SCRAM auth). Scram messages look like
> (for
> > > > >> example)
> > > > >> > > > *r=,s=,i=*. StoredKey and
> ServerKey
> > > and
> > > > >> not
> > > > >> > > > transferred in SCRAM messages, so I picked two keys that 

Jenkins build is back to normal : kafka-trunk-jdk8 #1257

2017-02-08 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #2516: MINOR: add GlobalKTable doc to streams.html

2017-02-08 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2516

MINOR: add GlobalKTable doc to streams.html

Update streams.html with GlobalKTable docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka global-tables-doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2516.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2516


commit c049feb31f5686f47a2da214cc92d5783870e33e
Author: Damian Guy 
Date:   2017-02-08T10:15:59Z

add global ktable docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk7 #1920

2017-02-08 Thread Apache Jenkins Server
See 



Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-08 Thread Manikumar
Hi Jun,


> If a token expires, then every broker will potentially try to delete it
> around the same time, but only one will succeed. So, we will have to deal
> with failures in that case? Another way is to let just one broker (say, the
> controller) deletes expired tokens.
>
>
 Agree, we can run the token expiry check thread as part of controller
broker.
 WIll update the KIP.


Thanks,
Manikumar


>
> On Sun, Feb 5, 2017 at 9:54 AM, Manikumar 
> wrote:
>
> > Hi Jun,
> >
> >  Please see the replies inline.
> >
> >
> > > >
> > > > Only one broker does the deletion. Broker updates the expiration in
> its
> > > > local cache
> > > > and on zookeeper so other brokers also get notified and their cache
> > > > statuses are updated as well.
> > > >
> > > >
> > > Which broker does the deletion?
> > >
> >
> > Any broker can handle the create/expire/renew/describe delegationtoken
> > requests.
> > changes are propagated through zk notifications.  Every broker is
> > responsible for
> > expiring the tokens. This check be can done during request handling time
> > and/or
> > during token authentication time.
> >
> >
> > >
> > >
> > > 110. The diagrams in the wiki still show MD5 digest. Could you change
> it
> > to
> > > SCRAM?
> > >
> > >
> >   Updated the diagram.
> >
> >
> >
> > Thanks,
> > Manikumar
> >
> >
> >
> >
> > >
> > >
> > > >
> > > > Thanks.
> > > > Manikumar
> > > >
> > > >
> > > > >
> > > > > On Fri, Dec 23, 2016 at 9:26 AM, Manikumar <
> > manikumar.re...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I would like to initiate the vote on KIP-48:
> > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+
> > > > > > Delegation+token+support+for+Kafka
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Manikumar
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration

2017-02-08 Thread Damian Guy
Matthias,

Fair point. I'll update it the KIP.
Thanks

On Wed, 8 Feb 2017 at 05:49 Matthias J. Sax  wrote:

> Damian,
>
> I am not strict about it either. However, if there is no advantage in
> disabling it, we might not want to allow it. This would have the
> advantage to guard users to accidentally switch it off.
>
> -Matthias
>
>
> On 2/3/17 2:03 AM, Damian Guy wrote:
> > Hi Matthias,
> >
> > It possibly doesn't make sense to disable it, but then i'm sure someone
> > will come up with a reason they don't want it!
> > I'm happy to change it such that the checkpoint interval must be > 0.
> >
> > Cheers,
> > Damian
> >
> > On Fri, 3 Feb 2017 at 01:29 Matthias J. Sax 
> wrote:
> >
> >> Thanks Damian.
> >>
> >> One more question: "Checkpointing is disabled if the checkpoint interval
> >> is set to a value <=0."
> >>
> >>
> >> Does it make sense to disable check pointing? What's the tradeoff here?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 2/2/17 1:51 AM, Damian Guy wrote:
> >>> Hi Matthias,
> >>>
> >>> Thanks for the comments.
> >>>
> >>> 1. TBD - i need to do some performance tests and try and work out a
> >>> sensible default.
> >>> 2. Yes, you are correct. It could be a multiple of the
> >> commit.interval.ms.
> >>> But, that would also mean if you change the commit interval - say you
> >> lower
> >>> it, then you might also need to change the checkpoint setting (i.e, you
> >>> still only want to checkpoint every n minutes).
> >>>
> >>> On Wed, 1 Feb 2017 at 23:46 Matthias J. Sax 
> >> wrote:
> >>>
>  Thanks for the KIP Damian.
> 
>  I am wondering about two things:
> 
>  1. what should be the default value for the new parameter?
>  2. why is the new parameter provided in ms?
> 
>  About (2): because
> 
>  "the minimum checkpoint interval will be the value of
>  commit.interval.ms. In effect the actual checkpoint interval will be
> a
>  multiple of the commit interval"
> 
>  it might be easier to just use an parameter that is "number-or-commit
>  intervals".
> 
> 
>  -Matthias
> 
> 
>  On 2/1/17 7:29 AM, Damian Guy wrote:
> > Thanks for the comments Eno.
> > As for exactly once, i don't believe this matters as we are just
>  restoring
> > the change-log, i.e, the result of the aggregations that previously
> ran
> > etc. So once initialized the state store will be in the same state as
> >> it
> > was before.
> > Having the checkpoint in a kafka topic is not ideal as the state is
> per
> > kafka streams instance. So each instance would need to start with a
>  unique
> > id that is persistent.
> >
> > Cheers,
> > Damian
> >
> > On Wed, 1 Feb 2017 at 13:20 Eno Thereska 
> >> wrote:
> >
> >> As a follow up to my previous comment, have you thought about
> writing
>  the
> >> checkpoint to a topic instead of a local file? That would have the
> >> advantage that all metadata continues to be managed by Kafka, as
> well
> >> as
> >> fit with EoS. The potential disadvantage would be a slower latency,
>  however
> >> if it is periodic as you mention, I'm not sure that would be a show
>  stopper.
> >>
> >> Thanks
> >> Eno
> >>> On 1 Feb 2017, at 12:58, Eno Thereska 
> >> wrote:
> >>>
> >>> Thanks Damian, this is a good idea and will reduce the restore
> time.
> >> Looking forward, with exactly once and support for transactions in
>  Kafka, I
> >> believe we'll have to add some support for rolling back checkpoints,
>  e.g.,
> >> when a transaction is aborted. We need to be aware of that and
> ideally
> >> anticipate a bit those needs in the KIP.
> >>>
> >>> Thanks
> >>> Eno
> >>>
> >>>
>  On 1 Feb 2017, at 10:18, Damian Guy  wrote:
> 
>  Hi all,
> 
>  I would like to start the discussion on KIP-116:
> 
> 
> >>
> 
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-116+-+Add+State+Store+Checkpoint+Interval+Configuration
> 
>  Thanks,
>  Damian
> >>>
> >>
> >>
> >
> 
> 
> >>>
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-08 Thread Dong Lin
Hi all,

Thank you all for the helpful suggestion. I have updated the KIP to address
the comments received so far. See here
to
read the changes of the KIP. Here is a summary of change:

- Updated the Proposed Change section to change the recovery steps. After
this change, broker will also create replica as long as all log directories
are working.
- Removed kafka-log-dirs.sh from this KIP since user no longer needs to use
it for recovery from bad disks.
- Explained how the znode controller_managed_state is managed in the Public
interface section.
- Explained what happens during controller failover, partition reassignment
and topic deletion in the Proposed Change section.
- Updated Future Work section to include the following potential
improvements
  - Let broker notify controller of ISR change and disk state change via
RPC instead of using zookeeper
  - Handle various failure scenarios (e.g. slow disk) on a case-by-case
basis. For example, we may want to detect slow disk and consider it as
offline.
  - Allow admin to mark a directory as bad so that it will not be used.

Thanks,
Dong



On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin  wrote:

> Hey Eno,
>
> Thanks much for the comment!
>
> I still think the complexity added to Kafka is justified by its benefit.
> Let me provide my reasons below.
>
> 1) The additional logic is easy to understand and thus its complexity
> should be reasonable.
>
> On the broker side, it needs to catch exception when access log directory,
> mark log directory and all its replicas as offline, notify controller by
> writing the zookeeper notification path, and specify error in
> LeaderAndIsrResponse. On the controller side, it will listener to
> zookeeper for disk failure notification, learn about offline replicas in
> the LeaderAndIsrResponse, and take offline replicas into consideration when
> electing leaders. It also mark replica as created in zookeeper and use it
> to determine whether a replica is created.
>
> That is all the logic we need to add in Kafka. I personally feel this is
> easy to reason about.
>
> 2) The additional code is not much.
>
> I expect the code for KIP-112 to be around 1100 lines new code. Previously
> I have implemented a prototype of a slightly different design (see here
> )
> and uploaded it to github (see here
> ). The patch changed 33
> files, added 1185 lines and deleted 183 lines. The size of prototype patch
> is actually smaller than patch of KIP-107 (see here
> ) which is already accepted.
> The KIP-107 patch changed 49 files, added 1349 lines and deleted 141 lines.
>
> 3) Comparison with one-broker-per-multiple-volumes
>
> This KIP can improve the availability of Kafka in this case such that one
> failed volume doesn't bring down the entire broker.
>
> 4) Comparison with one-broker-per-volume
>
> If each volume maps to multiple disks, then we still have similar problem
> such that the broker will fail if any disk of the volume failed.
>
> If each volume maps to one disk, it means that we need to deploy 10
> brokers on a machine if the machine has 10 disks. I will explain the
> concern with this approach in order of their importance.
>
> - It is weird if we were to tell kafka user to deploy 50 brokers on a
> machine of 50 disks.
>
> - Either when user deploys Kafka on a commercial cloud platform or when
> user deploys their own cluster, the size or largest disk is usually
> limited. There will be scenarios where user want to increase broker
> capacity by having multiple disks per broker. This JBOD KIP makes it
> feasible without hurting availability due to single disk failure.
>
> - Automatic load rebalance across disks will be easier and more flexible
> if one broker has multiple disks. This can be future work.
>
> - There is performance concern when you deploy 10 broker vs. 1 broker on
> one machine. The metadata the cluster, including FetchRequest,
> ProduceResponse, MetadataRequest and so on will all be 10X more. The
> packet-per-second will be 10X higher which may limit performance if pps is
> the performance bottleneck. The number of socket on the machine is 10X
> higher. And the number of replication thread will be 100X more. The impact
> will be more significant with increasing number of disks per machine. Thus
> it will limit Kakfa's scalability in the long term.
>
> Thanks,
> Dong
>
>
> On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska 
> wrote:
>
>> Hi Dong,
>>
>> To simplify the discussion today, on my part I'll zoom into one thing
>> only:
>>
>> - I'll discuss the options called below : "one-broker-per-disk" or
>> "one-broker-per-few-disks".
>>
>> - I completely buy the JBOD vs RAID arguments so there is no need to
>> discuss that part 

[jira] [Commented] (KAFKA-4733) Improve Streams Reset Tool console output

2017-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15857618#comment-15857618
 ] 

ASF GitHub Bot commented on KAFKA-4733:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2503


> Improve Streams Reset Tool console output
> -
>
> Key: KAFKA-4733
> URL: https://issues.apache.org/jira/browse/KAFKA-4733
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 0.10.3.0
>
>
> Currently, the console output of {{bin/kafka-streams-application-reset.sh}} 
> is not helpful enough to users:
> - we should add a hint to clean up local state using 
> {{KafkaStreams#cleanup()}}
> - we should clarify what to specify for each parameter (i,e, what is an input 
> topic, what is an intermediate topics)
> - we should clarify, that it is not required to specify internal topics (and 
> what those are)
> - we should clarify what the tool does for the different topics, ie., 
> seek+commit, delete etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4733) Improve Streams Reset Tool console output

2017-02-08 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4733?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4733.
--
   Resolution: Fixed
Fix Version/s: 0.10.3.0

Issue resolved by pull request 2503
[https://github.com/apache/kafka/pull/2503]

> Improve Streams Reset Tool console output
> -
>
> Key: KAFKA-4733
> URL: https://issues.apache.org/jira/browse/KAFKA-4733
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, tools
>Reporter: Matthias J. Sax
>Assignee: Gwen Shapira
>Priority: Minor
>  Labels: beginner, easyfix, newbie
> Fix For: 0.10.3.0
>
>
> Currently, the console output of {{bin/kafka-streams-application-reset.sh}} 
> is not helpful enough to users:
> - we should add a hint to clean up local state using 
> {{KafkaStreams#cleanup()}}
> - we should clarify what to specify for each parameter (i,e, what is an input 
> topic, what is an intermediate topics)
> - we should clarify, that it is not required to specify internal topics (and 
> what those are)
> - we should clarify what the tool does for the different topics, ie., 
> seek+commit, delete etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2503: KAFKA-4733: Improve Streams Reset Tool console out...

2017-02-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2503


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---