Build failed in Jenkins: kafka-0.11.0-jdk7 #409

2018-12-02 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: improve QueryableStateIntegrationTest (#5988)

--
[...truncated 287.60 KB...]

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testGetOrElseUpdateAtomically STARTED

kafka.utils.UtilsTest > testGetOrElseUpdateAtomically PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testChangeListenerTiming STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testChangeListenerTiming PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testLoadCache PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.PermissionTypeTest > testJavaConversions STARTED

kafka.security.auth.PermissionTypeTest > testJ

[jira] [Created] (KAFKA-7694) Support ZooKeeper based master/secret key management for delegation tokens

2018-12-02 Thread Manikumar (JIRA)
Manikumar created KAFKA-7694:


 Summary:  Support ZooKeeper based master/secret key management for 
delegation tokens
 Key: KAFKA-7694
 URL: https://issues.apache.org/jira/browse/KAFKA-7694
 Project: Kafka
  Issue Type: Sub-task
Reporter: Manikumar


Master/secret key is used to generate and verify delegation tokens. currently, 
master key/secret is stored as plain text in server.properties config file. 
Same key must be configured across all the brokers. We require a re-deployment 
when the secret needs to be rotated.

This JIRA is to explore and implement a ZooKeeper based master/secret key 
management to automate secret key generation and expiration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-12-02 Thread Guozhang Wang
Yes, I think it makes sense to let KafkaStreams to expose embedded consumer
client-id and instance-ids, e.g. at the ThreadMetadata exposed via
`localThreadsMetadata`.

I can open up a new KIP for the change on Streams side so that we can keep
this KIP as on consumer side only.


Guozhang


On Fri, Nov 30, 2018 at 9:31 PM Boyang Chen  wrote:

> Thanks Guozhang and Mayuresh for the followup here.
> > Also I was thinking if we can have a replace API, that takes in a map of
> > old to new instance Ids. Such that we can replace a consumer.
> > IF we have this api, and if a consumer host goes down due to hardware
> > issues, we can have another host spin up and take its place. This is
> like a
> > cold backup which can be a step towards providing the hot backup that we
> > discussed earlier in the KIP.
> I like Mayuresh's suggestion, and I think we could prepare follow-up work
> once 345 is done to add a replace API. For the
> very first version I feel this is not a must-have.
>
> For Streams, I think we do not need an extra config for the instance id,
> instead, we can re-use the way we construct the embedded consumer's client
> id as:
>
> [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer"
>
> So as long as user's specify the unique streams client-id, the resulted
> consumer client-id / instance-id should be unique as well already.
>
> So Guozhang you mean stream will enable static membership automatically
> correct? That would make the logic simpler
> and fewer code change on stream side.
>
> As for the LeaveGroupRequest, as I understand it, your concern is that when
> we are shutting down a single Streams instance that may contain multiple
> threads, shutting down that instance would mean shutting down multiple
> members. Personally I'd prefer to make the LeaveGroupRequest API more
> general and less inclined to Streams (I think Mayuresh also suggested
> this). So I'd suggest that we keep the LeaveGroupRequest API as suggested,
> i.e. a list of member.instance.ids. And in Streams we can add a new API in
> KafkaStreams to expose:
>
> 1) the list of embedded consumer / producer client ids,
> 2) the producer's txn ids if EOS is turned on, and
> 3) the consumer's instance ids.
>
> I agree with the suggestion to make the leave group request change
> generic. So this new Stream API
> will be added on the rest layer to expose the necessary ids correct?
>
> Looking forward to your confirmation 😊
>
> Best,
> Boyang
>
> --
> *From:* Guozhang Wang 
> *Sent:* Saturday, December 1, 2018 7:00 AM
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hi Boyang,
>
> For Streams, I think we do not need an extra config for the instance id,
> instead, we can re-use the way we construct the embedded consumer's client
> id as:
>
> [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer"
>
> So as long as user's specify the unique streams client-id, the resulted
> consumer client-id / instance-id should be unique as well already.
>
> As for the LeaveGroupRequest, as I understand it, your concern is that when
> we are shutting down a single Streams instance that may contain multiple
> threads, shutting down that instance would mean shutting down multiple
> members. Personally I'd prefer to make the LeaveGroupRequest API more
> general and less inclined to Streams (I think Mayuresh also suggested
> this). So I'd suggest that we keep the LeaveGroupRequest API as suggested,
> i.e. a list of member.instance.ids. And in Streams we can add a new API in
> KafkaStreams to expose:
>
> 1) the list of embedded consumer / producer client ids,
> 2) the producer's txn ids if EOS is turned on, and
> 3) the consumer's instance ids.
>
> So that Streams operators can read those values from KafkaStreams directly
> before shutting it down and use the list in the LeaveGroupRequest API. How
> about that?
>
>
> Guozhang
>
>
> On Fri, Nov 30, 2018 at 7:45 AM Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > I like Guozhang's suggestion to not have to wait for session timeout in
> > case we know that we want to downsize the consumer group and redistribute
> > the partitions among the remaining consumers.
> > IIUC, with the above suggestions, the admin api
> > "removeMemberFromGroup(groupId, list[instanceId])" or
> > "removeMemberFromGroup(groupId, instanceId)", will automatically cause a
> > rebalance, right?
> > I would prefer ist[instanceid] because that's more general scenario.
> >
> > Also I was thinking if we can have a replace API, that takes in a map of
> > old to new instance Ids. Such that we can replace a consumer.
> > IF we have this api, and if a consumer host goes down due to hardware
> > issues, we can have another host spin up and take its place. This is
> like a
> > cold backup which can be a step towards providing the hot backup that we
> > discussed earlier in the KIP.
> > Thoughts?
> >
> > Thanks,
> >
> > Mayuresh

[jira] [Resolved] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-12-02 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7235.

   Resolution: Fixed
Fix Version/s: 2.2.0

merged to trunk.

> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.2.0
>
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[DISCUSS] KIP-399: Extend ProductionExceptionHandler to cover serialization exceptions

2018-12-02 Thread Kamal Chandraprakash
Hello dev,

  I hope to initiate the discussion for KIP-399: Extend
ProductionExceptionHandler to cover serialization exceptions.

KIP: <
https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions
>
JIRA: https://issues.apache.org/jira/browse/KAFKA-7499

All feedbacks will be highly appreciated.

Thanks,
Kamal Chandraprakash


[jira] [Resolved] (KAFKA-1120) Controller could miss a broker state change

2018-12-02 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1120.

Resolution: Fixed
  Assignee: Zhanxiang (Patrick) Huang  (was: Mickael Maison)

This is fixed by KAFKA-7235.

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>  Labels: reliability
> Fix For: 2.2.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7692) updateFirstUnstableOffset NPE due to sequenceId overflow in ProducerStateManager.append

2018-12-02 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7692:
---

 Summary: updateFirstUnstableOffset NPE due to sequenceId overflow 
in ProducerStateManager.append
 Key: KAFKA-7692
 URL: https://issues.apache.org/jira/browse/KAFKA-7692
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
Reporter: Ming Liu
 Fix For: 2.2.0


When operating Kafka, we frequently saw this exception on Kafka server log, 

Exception: 

[2018-06-04 20:44:53,789] ERROR [KafkaServer id=19] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NullPointerException
    at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
    at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
    at kafka.log.Log.(Log.scala:228)
    at kafka.log.Log$.apply(Log.scala:1747)
    at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
    at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7693) "IllegalArgumentException: Invalid negative sequence number used" in Kafka Client

2018-12-02 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7693:
---

 Summary: "IllegalArgumentException: Invalid negative sequence 
number used" in Kafka Client
 Key: KAFKA-7693
 URL: https://issues.apache.org/jira/browse/KAFKA-7693
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Ming Liu


When operating Kafka, we have seen Kafka client logging this kind of exception:

org.apache.kafka.clients.producer.internals.Sender  -   
   [Producer 
clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer, 
transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
 Uncaught error in kafka producer I/O thread:

java.lang.IllegalArgumentException: Invalid negative sequence number used

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)

    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)

    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)

    at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)

    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers

2018-12-02 Thread Wladimir Schmidt

Hello Guozhang,

sure, the first approach is very straight-forward and allows minimal 
changes to the Kafka Streams API.
On the other hand, second approach with the interface implementation 
looks more cleaner to me.
I totally agree that this should be first discussed before will be 
implemented.


Thanks,

Wladimir


On 17-Nov-18 23:37, Guozhang Wang wrote:

Hello folks,

I'd like to revive this thread for discussion. After reading the previous
emails I think I'm still a bit leaning towards re-enabling to pass in
StreamsConfig to Kafka Streams constructors compared with a
ConfiguredStreamsFactory as additional parameters to overloaded
KafkaStreams constructors: although the former seems less cleaner as it
requires users to read through the usage of AbstractConfig to know how to
use it in their frameworks, this to me is a solvable problem through
documentations, plus AbstractConfig is a public interface already and hence
the additional ConfiguredStreamsFactory to me is really a bit overlapping
in functionality.


Guozhang



On Sun, Oct 21, 2018 at 1:41 PM Wladimir Schmidt  wrote:


Hi Damian,

The first approach was added only because it had been initially proposed
in my pull request,
which started a discussion and thus, the KIP-378 was born.

Yes, I would like to have something "injectable". In this regard, a
`ConfiguredStreamsFactory` (name is a subject to discussion)
is a good option to be introduced into `KafkaStreams` constructor.

Even though, I consider the second approach to be cleaner, it involves a
certain amount of refactoring of the streams library.
The first approach, on the contrary, adds (or removes deprecated
annotation, if the method has not been removed yet) only additional
constructors with
considerably less intervention into a streams library (no changes, which
would break an API. Please see a pull request:
https://github.com/apache/kafka/pull/5344).

Thanks
Wladimir

On 10-Oct-18 15:51, Damian Guy wrote:

Hi Wladimir,

Of the two approaches in the KIP - i feel the second approach is cleaner.
However, am i correct in assuming that you want to have the
`ConfiguredStreamsFactory` as a ctor arg in `StreamsConfig` so that

Spring

can inject this for you?

Otherwise you could just put the ApplicationContext as a property in the
config and then use that via the configure method of the appropriate
handler to get your actual handler.

Thanks,
Damian

On Tue, 9 Oct 2018 at 01:55, Guozhang Wang  wrote:


John, thanks for the explanation, now it makes much more sense to me.

As for the concrete approach, to me it seems the first option requires

less

changes than the second (ConfiguredStreamsFactory based) approach,

whereas

the second one requires an additional interface that is overlapping with
the AbstractConfig.

I'm aware that in KafkaProducer / KafkaConsumer we do not have public
constructors for taking a ProducerConfig or ConsumerConfig directly, and
anyone using Spring can share how you've worked around it by far? If it

is

very awkward I'm not against just adding the XXXConfigs to the

constructors

directly.

Guozhang

On Fri, Oct 5, 2018 at 1:48 PM, John Roesler  wrote:


Hi Wladimir,

Thanks for the KIP!

As I mentioned in the PR discussion, I personally prefer not to

recommend

overriding StreamsConfig for this purpose.

It seems like a person wishing to create a DI shim would have to

acquire

quite a deep understanding of the class and its usage to figure out

what

exactly to override to accomplish their goals without breaking

everything.

I'm honestly impressed with the method you came up with to create your
Spring/Streams shim.

I think we can make to path for the next person smoother by going with
something more akin to the ConfiguredStreamsFactory. This is a

constrained

interface that tells you exactly what you have to implement to create

such

a shim.

A few thoughts:
1. it seems like we can keep all the deprecated constructors still
deprecated

2. we could add just one additional constructor to each of KafkaStreams

and

TopologyTestDriver to still take a Properties, but also your new
ConfiguredStreamsFactory

3. I don't know if I'm sold on the name ConfiguredStreamsFactory, since

it

does not produce configured streams. Instead, it produces configured
instances... How about ConfiguredInstanceFactory?

4. if I understand the usage correctly, it's actually a pretty small

number

of classes that we actually make via getConfiguredInstance. Offhand, I

can

think of the key/value Serdes, the deserialization exception handler,

and

the production exception handler.
Perhaps, instead of maintaining a generic "class instantiator", we

could

explore a factory interface that just has methods for creating exactly

the

kinds of things we need to create. In fact, we already have something

like

this: org.apache.kafka.streams.KafkaClientSupplier . Do you think we

could

just add some more methods to that interface (and maybe rename it)

instead?

Thanks,
-John

On Fri, Oc

Re: [DISCUSS] KIP-390: Add producer option to adjust compression level

2018-12-02 Thread Ismael Juma
The updated title sounds fine to me.

Ismael

On Sun, Dec 2, 2018, 5:25 AM Dongjin Lee  Hi Ismael,
>
> Got it. Your direction is perfectly reasonable. I am now updating the KIP
> document and the implementation.
>
> By allowing the buffer/block size to be configurable, it would be better to
> update the title of the KIP like 'Allow fine-grained configuration for
> compression'. Is that right?
>
> @Other committers:
>
> Is there any other opinion on allowing the buffer/block size to be
> configurable?
>
> Thanks,
> Dongjin
>
> On Thu, Nov 29, 2018 at 1:45 AM Ismael Juma  wrote:
>
> > Hi Dongjin,
> >
> > To clarify, I mean a broker topic config with regards to point 1. As you
> > know, compression can be done by the producer and/or by the broker. The
> > default is for the broker to just use whatever compression was used by
> the
> > producer, but this can be changed by the user on a per topic basis. It
> > seems like it would make sense for the configs to be . consistent between
> > producer and broker.
> >
> > For point 2, I haven't looked at the implementation, but we could do it
> in
> > the `CompressionType` enum by invoking the right constructor or
> retrieving
> > the default value via a constant (if defined). That's an implementation
> > detail and can be discussed in the PR. The more general point is to rely
> on
> > the library defaults instead of choosing one ourselves.
> >
> > For point 3, I'm in favour of doing that in this KIP.
> >
> > Ismael
> >
> > On Wed, Nov 28, 2018 at 7:01 AM Dongjin Lee  wrote:
> >
> > > Thank you Ismael, here are the answers:
> > >
> > > *1. About topic config*
> > >
> > > After some consideration, I concluded that topic config doesn't need to
> > > support compression.level. Here is why: since the compression is
> > conducted
> > > by the client, the one who can select the best compression level is the
> > > client itself. Let us assume that the compression level is set at the
> > topic
> > > config level. In that case, there is a possibility that the compression
> > > level is not optimal for some producers. Actually, Kafka's go client
> also
> > > supports compression level functionality for the producer config only.
> > >  (wait, do we
> > > need
> > > to add this reasoning in the KIP, rejected alternatives section?)
> > >
> > > *2. About default level*
> > >
> > > As of current draft implementation, the default compression is set on
> the
> > > CompressionType enum. Of course, changing this strategy into relying
> on a
> > > method from the library to pick the default compression level seems
> > > possible, like `GZIPBlockOutputStream` does. In this case, we need to
> add
> > > similar wrapper class for zstd and modify lz4 the wrapper also. Add to
> > > this, it seems like we need to explicitly state that we follow the
> > default
> > > compression level of the codec in the documentation. Is this what you
> > > intended?
> > >
> > > *3. Whether to allow the buffer/block size to be configurable*
> > >
> > > Well, As of current draft implementation, the lz4 level is implemented
> as
> > > block size; this is caused by my misunderstanding on lz4. After
> reviewing
> > > lz4 today, I found that it also supports compression level of 1~16
> > > (default: 1), not block size. I will fix it in this weekend by updating
> > the
> > > wrapper class.
> > >
> > > For the problem of the buffer/block size, I have no strong opinion. If
> > the
> > > community needs it, I will do it all together. How do you think?
> > >
> > > In short, it seems like I need to update the KIP document for issue #1
> > and
> > > update the compression wrapper for issue #2, #3. Is this okay?
> > >
> > > Thanks,
> > > Dongjin
> > >
> > > On Wed, Nov 28, 2018 at 12:34 AM Ismael Juma 
> wrote:
> > >
> > > >  Thanks for the KIP, this is helpful. A few questions:
> > > >
> > > > 1. Have we considered whether we want to allow a similar topic
> config?
> > > > 2. Can we rely on a method from the library to pick the default
> > > compression
> > > > level if compression.level is not set? We do it for gzip and it would
> > > seem
> > > > reasonable to do something similar for the other compression
> libraries.
> > > > 3. Do we want to allow the buffer/block size to be configurable? This
> > has
> > > > an impact on memory usage and people may want to trade compression
> for
> > > > less/more memory in some cases. For example, the default for LZ4 is
> > 64KB
> > > > which is a bit high.
> > > >
> > > > Ismael
> > > >
> > > > On Sun, Nov 18, 2018, 2:07 PM Dongjin Lee  > > >
> > > > > Hello dev,
> > > > >
> > > > > I hope to initiate the discussion of KIP-390: Add producer option
> to
> > > > adjust
> > > > > compression level
> > > > > <
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Add+producer+option+to+adjust+compression+level
> > > > > >.
> > > > > All feedbacks will be highly appreciated.
> > > > >
> > > > > 

Re: [DISCUSS] KIP-390: Add producer option to adjust compression level

2018-12-02 Thread Dongjin Lee
Hi Ismael,

Got it. Your direction is perfectly reasonable. I am now updating the KIP
document and the implementation.

By allowing the buffer/block size to be configurable, it would be better to
update the title of the KIP like 'Allow fine-grained configuration for
compression'. Is that right?

@Other committers:

Is there any other opinion on allowing the buffer/block size to be
configurable?

Thanks,
Dongjin

On Thu, Nov 29, 2018 at 1:45 AM Ismael Juma  wrote:

> Hi Dongjin,
>
> To clarify, I mean a broker topic config with regards to point 1. As you
> know, compression can be done by the producer and/or by the broker. The
> default is for the broker to just use whatever compression was used by the
> producer, but this can be changed by the user on a per topic basis. It
> seems like it would make sense for the configs to be . consistent between
> producer and broker.
>
> For point 2, I haven't looked at the implementation, but we could do it in
> the `CompressionType` enum by invoking the right constructor or retrieving
> the default value via a constant (if defined). That's an implementation
> detail and can be discussed in the PR. The more general point is to rely on
> the library defaults instead of choosing one ourselves.
>
> For point 3, I'm in favour of doing that in this KIP.
>
> Ismael
>
> On Wed, Nov 28, 2018 at 7:01 AM Dongjin Lee  wrote:
>
> > Thank you Ismael, here are the answers:
> >
> > *1. About topic config*
> >
> > After some consideration, I concluded that topic config doesn't need to
> > support compression.level. Here is why: since the compression is
> conducted
> > by the client, the one who can select the best compression level is the
> > client itself. Let us assume that the compression level is set at the
> topic
> > config level. In that case, there is a possibility that the compression
> > level is not optimal for some producers. Actually, Kafka's go client also
> > supports compression level functionality for the producer config only.
> >  (wait, do we
> > need
> > to add this reasoning in the KIP, rejected alternatives section?)
> >
> > *2. About default level*
> >
> > As of current draft implementation, the default compression is set on the
> > CompressionType enum. Of course, changing this strategy into relying on a
> > method from the library to pick the default compression level seems
> > possible, like `GZIPBlockOutputStream` does. In this case, we need to add
> > similar wrapper class for zstd and modify lz4 the wrapper also. Add to
> > this, it seems like we need to explicitly state that we follow the
> default
> > compression level of the codec in the documentation. Is this what you
> > intended?
> >
> > *3. Whether to allow the buffer/block size to be configurable*
> >
> > Well, As of current draft implementation, the lz4 level is implemented as
> > block size; this is caused by my misunderstanding on lz4. After reviewing
> > lz4 today, I found that it also supports compression level of 1~16
> > (default: 1), not block size. I will fix it in this weekend by updating
> the
> > wrapper class.
> >
> > For the problem of the buffer/block size, I have no strong opinion. If
> the
> > community needs it, I will do it all together. How do you think?
> >
> > In short, it seems like I need to update the KIP document for issue #1
> and
> > update the compression wrapper for issue #2, #3. Is this okay?
> >
> > Thanks,
> > Dongjin
> >
> > On Wed, Nov 28, 2018 at 12:34 AM Ismael Juma  wrote:
> >
> > >  Thanks for the KIP, this is helpful. A few questions:
> > >
> > > 1. Have we considered whether we want to allow a similar topic config?
> > > 2. Can we rely on a method from the library to pick the default
> > compression
> > > level if compression.level is not set? We do it for gzip and it would
> > seem
> > > reasonable to do something similar for the other compression libraries.
> > > 3. Do we want to allow the buffer/block size to be configurable? This
> has
> > > an impact on memory usage and people may want to trade compression for
> > > less/more memory in some cases. For example, the default for LZ4 is
> 64KB
> > > which is a bit high.
> > >
> > > Ismael
> > >
> > > On Sun, Nov 18, 2018, 2:07 PM Dongjin Lee  > >
> > > > Hello dev,
> > > >
> > > > I hope to initiate the discussion of KIP-390: Add producer option to
> > > adjust
> > > > compression level
> > > > <
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Add+producer+option+to+adjust+compression+level
> > > > >.
> > > > All feedbacks will be highly appreciated.
> > > >
> > > > Best,
> > > > Dongjin
> > > >
> > > > --
> > > > *Dongjin Lee*
> > > >
> > > > *A hitchhiker in the mathematical world.*
> > > >
> > > > *github:  github.com/dongjinleekr
> > > > linkedin:
> > > kr.linkedin.com/in/dongjinleekr
> > > > slideshare:
> > > > www.slid

Re: [DISCUSS] KIP-394: Require member.id for initial join group request

2018-12-02 Thread Boyang Chen
In fact, it's probably better to move 
KIP-394
 to the vote stage first, so that it's easier to finalize the timeline and 
smooth the rollout plan for KIP-345. Jason and Stanislav, since you two involve 
most in this KIP, could you let me know if there is still any unclarity we want 
to resolve before moving to vote?

Best,
Boyang

From: Boyang Chen 
Sent: Saturday, December 1, 2018 10:53 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join group request

Thanks Jason for the reply! Since the overall motivation and design is pretty 
clear, I will go ahead to start implementation and we could discuss the 
underlying details in the PR.

Best,
Boyang

From: Matthias J. Sax 
Sent: Saturday, December 1, 2018 3:12 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join group request

SGTM.

On 11/30/18 10:17 AM, Jason Gustafson wrote:
> Using the session expiration logic we already have seems like the simplest
> option (this is probably a one or two line change). The rejoin should be
> quick anyway, so I don't think it's worth optimizing for unjoined new
> members. Just my two cents. This is more of an implementation detail, so
> need not necessarily be resolved here.
>
> -Jason
>
> On Fri, Nov 30, 2018 at 12:56 AM Boyang Chen  wrote:
>
>> Thanks Matthias for the question. I'm thinking of having a separate hash
>> set called `registeredMemberIds` which
>> will be cleared out every time a group finishes one round of rebalance.
>> Since storing one id is pretty trivial, using
>> purgatory to track the id removal is a bit wasteful in my opinion.
>> 
>> From: Matthias J. Sax 
>> Sent: Friday, November 30, 2018 10:26 AM
>> To: dev@kafka.apache.org
>> Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join group
>> request
>>
>> Thanks! Makes sense.
>>
>> I missed that fact, that the `member.id` is added on the second
>> joinGroup request that contains the `member.id`.
>>
>> However, it seems there is another race condition for this design:
>>
>> If two consumers join at the same time, it it possible that the broker
>> assigns the same `member.id` to both (because none of them have joined
>> the group yet--ie, second joinGroup request not sent yet--, the
>> `member.id` is not store broker side yes and broker cannot check for
>> duplicates when creating a new `member.id`.
>>
>> The probability might be fairly low thought. However, what Stanislav
>> proposed, to add the `member.id` directly, and remove it after
>> `session.timeout.ms` sound like a save option that avoids this issue.
>>
>> Thoughts?
>>
>>
>> -Matthias
>>
>> On 11/28/18 8:15 PM, Boyang Chen wrote:
>>> Thanks Matthias for the question, and Stanislav for the explanation!
>>>
>>> For the scenario described, we will never let a member join the
>> GroupMetadata map
>>> if it uses UNKNOWN_MEMBER_ID. So the workflow will be like this:
>>>
>>>   1.  Group is empty. Consumer c1 started. Join with UNKNOWN_MEMBER_ID;
>>>   2.  Broker rejects while allocating a member.id to c1 in response (c1
>> protocol version is current);
>>>   3.  c1 handles the error and rejoins with assigned member.id;
>>>   4.  Broker stores c1 in its group metadata;
>>>   5.  Consumer c2 started. Join with UNKNOWN_MEMBER_ID;
>>>   6.  Broker rejects while allocating a member.id to c2 in response (c2
>> protocol version is current);
>>>   7.  c2 fails to get the response/crashes in the middle;
>>>   8.  After certain time, c2 restarts a join request with
>> UNKNOWN_MEMBER_ID;
>>>
>>> As you could see, c2 will repeat step 6~8 until successfully send back a
>> join group request with allocated id.
>>> By then broker will include c2 within the broker metadata map.
>>>
>>> Does this sound clear to you?
>>>
>>> Best,
>>> Boyang
>>> 
>>> From: Stanislav Kozlovski 
>>> Sent: Wednesday, November 28, 2018 7:39 PM
>>> To: dev@kafka.apache.org
>>> Subject: Re: [DISCUSS] KIP-394: Require member.id for initial join
>> group request
>>>
>>> Hey Matthias,
>>>
>>> I think the notion is to have the `session.timeout.ms` to start ticking
>>> when the broker responds with the member.id. Then, the broker would
>>> properly expire consumers and not hold too many stale ones.
>>> This isn't mentioned in the KIP though so it is worth to wait for Boyang
>> to
>>> confirm
>>>
>>> On Wed, Nov 28, 2018 at 3:10 AM Matthias J. Sax 
>>> wrote:
>>>
 Thanks for the KIP Boyang.

 I guess I am missing something, but I am still learning more details
 about the rebalance protocol, so maybe you can help me out?

 Assume a client sends UNKNOWN_MEMBER_ID in its first joinGroup request.
 The broker generates a `member.id` and sends it back via
 `MEMBER_ID_REQUIRED` error respons